summaryrefslogtreecommitdiffstats
path: root/src/software/test
diff options
context:
space:
mode:
authorMichal Minar <miminar@redhat.com>2013-11-20 10:07:41 +0100
committerMichal Minar <miminar@redhat.com>2013-11-29 09:26:56 +0100
commit85b821bbabf16b4f96a636ae19817941445c37b0 (patch)
treece5b6f373dac939869782cc49c65d6b9c8750255 /src/software/test
parent35847aa0b11566154f0db5e2ef3673bccb138211 (diff)
downloadopenlmi-providers-85b821bbabf16b4f96a636ae19817941445c37b0.tar.gz
openlmi-providers-85b821bbabf16b4f96a636ae19817941445c37b0.tar.xz
openlmi-providers-85b821bbabf16b4f96a636ae19817941445c37b0.zip
software: rewritten tests
Software tests now create custom testing repositories and packages. This makes them more robust and efficient (nothing needs to be downloaded).
Diffstat (limited to 'src/software/test')
-rw-r--r--src/software/test/README49
-rw-r--r--src/software/test/base.py191
-rw-r--r--src/software/test/package.py210
-rw-r--r--src/software/test/reposetup.py658
-rw-r--r--src/software/test/repository.py272
-rw-r--r--src/software/test/rpmcache.py541
-rwxr-xr-xsrc/software/test/run.py132
-rw-r--r--src/software/test/swbase.py288
-rw-r--r--src/software/test/test_hosted_software_collection.py30
-rw-r--r--src/software/test/test_hosted_software_identity_resource.py194
-rw-r--r--src/software/test/test_installed_software_identity.py332
-rw-r--r--src/software/test/test_member_of_software_collection.py240
-rw-r--r--src/software/test/test_resource_for_software_identity.py298
-rw-r--r--src/software/test/test_software_identity.py273
-rw-r--r--src/software/test/test_software_identity_checks.py209
-rw-r--r--src/software/test/test_software_identity_file_check.py793
-rw-r--r--src/software/test/test_software_identity_resource.py407
-rw-r--r--src/software/test/test_system_software_collection.py19
-rw-r--r--src/software/test/util.py8
19 files changed, 3404 insertions, 1740 deletions
diff --git a/src/software/test/README b/src/software/test/README
index cb1acce..4a059bd 100644
--- a/src/software/test/README
+++ b/src/software/test/README
@@ -2,38 +2,47 @@ Testing
=======
Tests must be run as root on machine with cimom broker.
There are number of environment variables, that affect test running:
- LMI_CIMOM_USERNAME
- LMI_CIMOM_PASSWORD
- LMI_CIMOM_BROKER
- LMI_CIMOM_URL
- LMI_RUN_DANGEROUS - valid values: 0, 1
- LMI_SOFTWARE_USE_CACHE - valid values: 0, 1
- - says, whether to use cache to download rpm packages, which will
- be used to speed up testing
- LMI_SOFTWARE_CACHE_DIR
- - directory, where to store rpm packages and other information
- - if not set and LMI_SOFTWARE_USE_CACHE == '1', a temporary
- directory will be created and deleted, when tests are
- complete
+ ``LMI_CIMOM_USERNAME``
+ ``LMI_CIMOM_PASSWORD``
+ ``LMI_CIMOM_BROKER``
+ ``LMI_CIMOM_URL``
+ ``LMI_RUN_DANGEROUS`` - valid values: 0, 1
+ ``LMI_RUN_TEDIOUS`` - valid values: 0, 1
+ ``LMI_SOFTWARE_DB_CACHE``
+ Where to store the file with information about generated testing
+ database. If not set, temporary file will be created and whole cache
+ will be removed after tests are complete. If set and the file does not
+ yet exist its created and user may choose if he wants to preserve
+ generated cache for another run (with ``LMI_SOFTWARE_CLEANUP_CACHE``).
+ If set and the file exists, database is reused, which speeds up the
+ intialization.
+ ``LMI_SOFTWARE_CLEANUP_CACHE`` - valid values: 0, 1
+ Whether to preserve generated reposities and database file (cache) for
+ next runs to speed them up. Defaults to False.
+ ``LMI_SOFTWARE_DB_CACHE`` must be set for this to have any effect.
+
+Dependencies
+------------
+ * @development-tools (rpmbuild, createrepo, etc.)
Preparation
-----------
-Tests are installed together with source in single python egg. They are
-accessible from ``lmi.test.software`` package.
+``lmi.test`` module must be on python path. The easiest way to get it there is
+to export ``PYTHONPATH`` environment variable like this: ::
+
+ export PYTHONPATH="${PATH_TO_GIT_ROOT}/src/python"
Running tests
-------------
-They can be run with a script ``lmi-test-software``.
-
Each test module can be run as a stand-alone script or with run.py script: ::
- ./run.py -c --cache-dir=/var/tmp
+ ./run.py
To pass any arguments to underlying ``unittest.main()`` function, append them
after "--" swith like this: ::
- ./run.py -c --cache-dir=/var/tmp -- -v TestSoftwarePackage
+ ./run.py -- -v TestSoftwarePackage
All environment variables defined above can be overriden by command-line
-arguments.
+arguments. See help message of ``run.py`` script.
diff --git a/src/software/test/base.py b/src/software/test/base.py
deleted file mode 100644
index 6b6f7f9..0000000
--- a/src/software/test/base.py
+++ /dev/null
@@ -1,191 +0,0 @@
-# -*- encoding: utf-8 -*-
-# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-#
-# Authors: Michal Minar <miminar@redhat.com>
-#
-"""
-Common utilities and base class for all software tests.
-"""
-
-import itertools
-import os
-import tempfile
-from subprocess import check_output
-
-from lmi.test.lmibase import LmiTestCase
-import repository
-import rpmcache
-
-def get_pkg_files(pkg):
- """
- Tries to make a the heterogenous and smallest set of test files from
- package.
- @param pkg must be installed package
- @return list of few files installed by pkg
- """
- cmd = ['rpm', '-ql', pkg.name]
- output = check_output(cmd)
- configs = set()
- docs = set()
- dirs = set()
- files = set()
- symlinks = set()
- for fpath in output.splitlines(): #pylint: disable=E1103
- if ( len(dirs) == 0
- and not os.path.islink(fpath)
- and os.path.isdir(fpath)):
- dirs.add(fpath)
- elif len(symlinks) == 0 and os.path.islink(fpath):
- symlinks.add(fpath)
- elif not os.path.islink(fpath) and os.path.isfile(fpath):
- if len(configs) == 0 and rpmcache.has_pkg_config_file(pkg, fpath):
- configs.add(fpath)
- elif len(docs) == 0 and rpmcache.has_pkg_doc_file(pkg, fpath):
- docs.add(fpath)
- elif len(files) == 0:
- files.add(fpath)
- out = list(configs) + list(docs) + list(dirs) + list(symlinks)
- if len(files) > 0 and len(docs) == 0 and len(symlinks) == 0:
- out += list(files)
- return out
-
-class SoftwareBaseTestCase(LmiTestCase): #pylint: disable=R0904
- """
- Base class for all LMI Software test classes.
- """
-
- CLASS_NAME = "Define in subclass"
- KEYS = tuple()
-
- # will be filled when first needed
- # it's a dictionary with items (pkg_name, [file_path1, ...])
- PKGDB_FILES = None
- REPODB = None
-
- @classmethod
- def get_pkgdb_files(cls):
- """
- @return dictionary { pkg_name: ["file_path1, ...] }
- """
- if cls.PKGDB_FILES is not None:
- return cls.PKGDB_FILES
- SoftwareBaseTestCase.PKGDB_FILES = res = dict(
- (pkg.name, get_pkg_files(pkg)) for pkg in itertools.chain(
- cls.safe_pkgs, cls.dangerous_pkgs))
- return res
-
- @classmethod
- def get_repodb(cls):
- """
- @return list of Repository instances
- """
- if cls.REPODB is not None:
- return cls.REPODB
- SoftwareBaseTestCase.REPODB = res = repository.get_repo_database()
- return res
-
- @classmethod
- def needs_pkgdb(cls):
- """subclass may override this, if it does not need PKGDB database"""
- return True
-
- @classmethod
- def needs_pkgdb_files(cls):
- """subclass may override this, if it needs PKGDB_FILES database"""
- return False
-
- @classmethod
- def needs_repodb(cls):
- """subclass may override this, if it needs REPODB database"""
- return False
-
- def __init__(self, *args, **kwargs):
- LmiTestCase.__init__(self, *args, **kwargs)
- self.longMessage = True #pylint: disable=C0103
-
- def install_pkg(self, pkg, newer=True, repolist=None):
- """
- Use this method instead of function in rpmcache in tests.
- """
- if repolist is None:
- repolist = self.test_repos
- return rpmcache.install_pkg(pkg, newer, repolist)
-
- def ensure_pkg_installed(self, pkg, newer=True, repolist=None):
- """
- Use this method instead of function in rpmcache in tests.
- """
- if repolist is None:
- repolist = self.test_repos
- return rpmcache.ensure_pkg_installed(pkg, newer, repolist)
-
- @classmethod
- def setUpClass(cls): #pylint: disable=C0103
- LmiTestCase.setUpClass.im_func(cls)
-
- # TODO: make dangerous tests work reliably
- # this is just a temporary solution
- cls.run_dangerous = False
-
- cls.test_repos = os.environ.get(
- 'LMI_SOFTWARE_TEST_REPOS', '').split(',')
- use_cache = os.environ.get('LMI_SOFTWARE_USE_CACHE', '0') == '1'
- cls.cache_dir = None
- if use_cache:
- cls.cache_dir = os.environ.get('LMI_SOFTWARE_CACHE_DIR', None)
- if cls.cache_dir is None:
- cls.cache_dir = tempfile.mkdtemp(suffix="software_database")
- if cls.cache_dir:
- cls.prev_dir = os.getcwd()
- if not os.path.exists(cls.cache_dir):
- os.makedirs(cls.cache_dir)
- # rpm packages are expected to be in CWD
- os.chdir(cls.cache_dir)
- if not hasattr(cls, 'safe_pkgs') or not hasattr(cls, 'dangerous_pkgs'):
- if cls.needs_pkgdb():
- safe, dangerous = rpmcache.get_pkg_database(
- use_cache=use_cache,
- dangerous=cls.run_dangerous,
- repolist=cls.test_repos,
- cache_dir=cls.cache_dir if use_cache else None)
- SoftwareBaseTestCase.safe_pkgs = safe
- SoftwareBaseTestCase.dangerous_pkgs = dangerous
- else:
- cls.safe_pkgs = []
- cls.dangerous_pkgs = []
- if cls.needs_pkgdb_files() and not hasattr(cls, 'pkgdb_files'):
- for pkg in cls.dangerous_pkgs:
- if not rpmcache.is_pkg_installed(pkg.name):
- rpmcache.install_pkg(pkg, repolist=cls.test_repos)
- SoftwareBaseTestCase.pkgdb_files = cls.get_pkgdb_files()
- if cls.needs_repodb() and not hasattr(cls, 'repodb'):
- SoftwareBaseTestCase.repodb = cls.get_repodb()
-
- @classmethod
- def tearDownClass(cls): #pylint: disable=C0103
- if hasattr(cls, "repodb") and cls.repodb:
- # set the enabled states to original values
- for repo in cls.repodb:
- if repository.is_repo_enabled(repo) != repo.status:
- repository.set_repo_enabled(repo, repo.status)
- if cls.run_dangerous:
- for pkg in cls.dangerous_pkgs:
- if rpmcache.is_pkg_installed(pkg.name):
- rpmcache.remove_pkg(pkg.name)
- if hasattr(cls, "prev_dir"):
- os.chdir(cls.prev_dir)
-
diff --git a/src/software/test/package.py b/src/software/test/package.py
index d7b2288..500f8b5 100644
--- a/src/software/test/package.py
+++ b/src/software/test/package.py
@@ -21,111 +21,191 @@
Abstraction for RPM package for test purposes.
"""
-import json
+import subprocess
import util
-class Package(object): #pylint: disable=R0902
+class Package(object):
"""
Element of test package database. It's a container for package
- informations. It contains two sets of versions for single package.
- That's meant for updating tests.
+ informations.
"""
- def __init__(self, name, epoch, ver, rel, arch, repo,
- **kwargs):
- """
- Arguments prefixed with 'up_' are for newer package.
- """
+ def __init__(self, name, epoch, ver, rel, arch, repoid,
+ rpm_path, files=None):
self._name = name
- if not epoch or epoch.lower() == "(none)":
+ if not epoch or (isinstance(epoch, basestring)
+ and epoch.lower() == "(none)"):
epoch = "0"
- self._epoch = epoch
+ self._epoch = int(epoch)
self._ver = ver
self._rel = rel
self._arch = arch
- self._repo = repo
- safe = kwargs.get('safe', False)
- self._up_epoch = epoch if safe else kwargs.get('up_epoch', epoch)
- self._up_ver = ver if safe else kwargs.get('up_ver' , ver)
- self._up_rel = rel if safe else kwargs.get('up_rel' , rel)
- self._up_repo = repo if safe else kwargs.get('up_repo', repo)
+ self._repoid = repoid
+ self._rpm_path = rpm_path
+ if files is None:
+ files = set()
+ else:
+ files = set(files)
+ self._files = files
def __str__(self):
return self.get_nevra()
@property
- def name(self): return self._name #pylint: disable=C0111,C0321
- @property
- def epoch(self): return self._epoch #pylint: disable=C0111,C0321
- @property
- def ver(self): return self._ver #pylint: disable=C0111,C0321
+ def name(self):
+ """ :returns: Package name. """
+ return self._name
@property
- def rel(self): return self._rel #pylint: disable=C0111,C0321
+ def epoch(self):
+ """ :returns: Package epoch as an integer. """
+ return self._epoch
@property
- def arch(self): return self._arch #pylint: disable=C0111,C0321
+ def ver(self):
+ """ :returns: Version string of package. """
+ return self._ver
@property
- def repo(self): return self._repo #pylint: disable=C0111,C0321
+ def rel(self):
+ """ :returns: Release string of package. """
+ return self._rel
@property
- def nevra(self): #pylint: disable=C0111,C0321
- return self.get_nevra(False)
+ def arch(self):
+ """ :returns: Architecture string of package. """
+ return self._arch
@property
- def evra(self): #pylint: disable=C0111,C0321
- return self.get_evra(False)
-
- @property
- def up_epoch(self): return self._up_epoch #pylint: disable=C0111,C0321
- @property
- def up_ver(self): return self._up_ver #pylint: disable=C0111,C0321
+ def repoid(self):
+ """ :returns: Repository id of package, where it is available. """
+ return self._repoid
@property
- def up_rel(self): return self._up_rel #pylint: disable=C0111,C0321
- @property
- def up_repo(self): return self._up_repo #pylint: disable=C0111,C0321
+ def nevra(self):
+ """
+ :returns: Nevra string of package with epoch part always present.
+ """
+ return self.get_nevra('ALWAYS')
@property
- def up_nevra(self): #pylint: disable=C0111,C0321
- return self.get_nevra(True)
+ def evra(self):
+ """
+ :returns: Evra string of package. That's the same as *nevra* without a
+ name.
+ """
+ attrs = ('epoch', 'ver', 'rel', 'arch')
+ return util.make_evra(*[getattr(self, '_'+a) for a in attrs])
@property
- def up_evra(self): #pylint: disable=C0111,C0321
- return self.get_evra(True)
+ def rpm_path(self):
+ """
+ :returns: Absolute path to rpm package.
+ """
+ return self._rpm_path
@property
- def is_safe(self):
+ def summary(self):
"""
- @return True if properties prefixed with up_ matches those without
- it. In that case a package is suited for non-dangerous tests.
+ :returns: Package summary string.
"""
- return all( getattr(self, a) == getattr(self, 'up_' + a)
- for a in ('epoch', 'ver', 'rel', 'repo'))
+ return subprocess.check_output(
+ ['/usr/bin/rpm', '-q', '--qf', '%{SUMMARY}', '-p',
+ self.rpm_path])
- def get_nevra(self, newer=True, with_epoch='NOT_ZERO'):
+ def get_nevra(self, with_epoch='NOT_ZERO'):
"""
- @newer if True, evr part is made from properties prefixed with 'up_'
- @return pkg nevra string
+ :param string with_epoch: Says when the epoch part should appear
+ in resulting string. There are following possible values:
+
+ * NOT_ZERO - epoch shall be present if it's greater than 0
+ * ALWAYS - epoch will be present
+ * NEVER - epoch won't be present
+
+ :returns: Package nevra string.
+ :rtype: string
"""
- if newer:
- attrs = ['name', 'up_epoch', 'up_ver', 'up_rel', 'arch']
- else:
- attrs = ['name', 'epoch', 'ver', 'rel', 'arch']
+ attrs = ('name', 'epoch', 'ver', 'rel', 'arch')
return util.make_nevra(*[getattr(self, '_'+a) for a in attrs],
with_epoch=with_epoch)
- def get_evra(self, newer=True):
+ def __contains__(self, path):
+ return path in self._files
+ def __iter__(self):
+ for pkg in self._files.__iter__():
+ yield pkg
+ def __len__(self):
+ return len(self._files)
+ @property
+ def files(self):
"""
- @newer if True, evr part is made from properties prefixed with 'up_'
- @return pkg nevra string
+ :returns: Set of files and directories installed by this package.
+ :rtype: set
"""
- attrs = [('up_' if newer else '')+a for a in ('epoch', 'ver', 'rel')]
- attrs.append('arch')
- return util.make_evra(*[getattr(self, '_'+a) for a in attrs])
+ return self._files.copy()
-class PackageEncoder(json.JSONEncoder):
- def default(self, obj):
- if isinstance(obj, Package):
- return obj.__dict__
- return json.JSONEncoder.default(self, obj)
+def to_json(_encoder, pkg):
+ """
+ Converts package object to dictionary which json encoder can handle.
+
+ :param _encoder: Instance of json encoder.
+ :param pkg: Package object to convert.
+ :type pkg: :py:class:`Package`
+ :returns: Dictionary with package attributes.
+ :rtype: dictionary
+ """
+ if not isinstance(pkg, Package):
+ raise TypeError("not a Package object")
+ return pkg.__dict__
def from_json(json_object):
- if '_arch' in json_object:
+ """
+ Constructs a package object from dictionary loaded from json text.
+ Inverse function to :py:func:`to_json`.
+
+ :param dictionary json_object: Deserialized package as a dictionary.
+ :returns: Package object.
+ :rtype: :py:class:`Package`
+ """
+ if isinstance(json_object, dict) and '_arch' in json_object:
kwargs = {k[1:]: v for k, v in json_object.items()}
return Package(**kwargs)
return json_object
+def is_pkg_installed(pkg):
+ """
+ Check, whether package is installed.
+ """
+ if not isinstance(pkg, Package):
+ return subprocess.call(["rpm", "--quiet", "-q", pkg]) == 0
+ else:
+ cmd = [ "rpm", "-q", "--qf", "%{EPOCH}:%{NVRA}\n", pkg.get_nevra(
+ with_epoch='NEVER') ]
+ try:
+ out = subprocess.check_output(cmd).splitlines()[0]
+ epoch, _ = out.split(':')
+ if not epoch or epoch.lower() == "(none)":
+ epoch = "0"
+ return int(epoch) == int(pkg.epoch)
+ except subprocess.CalledProcessError:
+ return False
+
+def remove_pkg(pkg, *args):
+ """
+ Remove package with rpm command.
+
+ :param pkg: Either an instance of :py:class:`Package` or package name.
+ If it's a name, any version will be removed. Otherwise the exact
+ version must be installed for command to be successful.
+ :param list args: List of parameters for rpm command.
+ """
+ cmd = ["rpm", "--quiet"] + list(args)
+ if isinstance(pkg, Package):
+ cmd.extend(["-e", pkg.get_nevra()])
+ else:
+ cmd.extend(["-e", pkg])
+ subprocess.call(cmd)
+
+def install_pkg(pkg):
+ """
+ Install a specific package.
+
+ :param pkg: Package object.
+ :type pkg: :py:class:`Package`
+ """
+ if not isinstance(pkg, Package):
+ raise TypeError("pkg must be a Package instance")
+ subprocess.call(["rpm", "--quiet", "-i", pkg.rpm_path])
+
diff --git a/src/software/test/reposetup.py b/src/software/test/reposetup.py
new file mode 100644
index 0000000..93f4429
--- /dev/null
+++ b/src/software/test/reposetup.py
@@ -0,0 +1,658 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Michal Minar <miminar@redhat.com>
+#
+"""
+Module defining and creating testing packages and repositories for
+OpenLMI Software provider.
+"""
+
+import collections
+import datetime
+import functools
+import json
+import os
+import shutil
+import stat
+import subprocess
+import tempfile
+import time
+import yum
+
+import package
+import repository
+import util
+
+REGULAR, DOC, CONFIG, LINK, CHARDEV, BLOCKDEV, FIFO = [
+ 2**i for i in range(7) ]
+
+#: Holds variables for yum config.
+#: * ``releasever`` - version of distribution ('18' for *Fedora 18*).
+#: * ``disttag`` - dist tag of distribution. This is a part of release string
+#: of packages build for particular distribution
+#: ('.fc18' for *Fedora 18*).
+#: * ``basearch`` - target architecture ('i686', 'x86_64', etc.).
+#: * ``arch`` - system architecture ('ia32e')
+YumVars = collections.namedtuple('YumVars',
+ ('releasever', 'disttag', 'basearch', 'arch'))
+
+#: Dictionary with shortned package names as keys with assigned contents.
+#: Content represented with recursive dictionaries where dictionaries
+#: represent folders and tuples files, pipes, symlinks, etc.
+#: Directories whose names end with '/' will be owned.
+PKG_FILE_ENTRIES = {
+ 'pkg1' : {
+ 'usr/share/openlmi-sw-test-pkg1/' : {
+ 'README' : (REGULAR | DOC, 0644, 'This is a test pkg 1\n'),
+ 'Changelog' : (REGULAR | DOC, 0644, 'Nothing new\n'),
+ 'data/' : { 'file.txt' : (REGULAR, 0644, 'content\n') }
+ },
+ 'etc/openlmi/software/test/' : {
+ 'dummy_config.cfg' :
+ (REGULAR | CONFIG, 0644, '# This is a dummy cfg file\n'),
+ }
+ },
+
+ 'pkg2' : {
+ 'usr/share/openlmi-sw-test-pkg2/' : {
+ 'README' : (REGULAR | DOC, 0644, 'This is a test pkg 2\n'),
+ 'Changelog' : (REGULAR | DOC, 0644, 'Nothing new\n'),
+ 'data/' : {
+ 'target.txt' : (REGULAR, 0644, 'Target of symlinks\n')
+ },
+ 'symlinks/' : {
+ 'relative' : (LINK, 0644, '../data/target.txt'),
+ 'absolute' : (LINK, 0644,
+ 'usr/share/openlmi-sw-text-pkg1/data/target.txt')
+ }
+ },
+ },
+
+ 'pkg3' : {
+ 'usr/share/openlmi-sw-test-pkg3/' : {
+ 'README' : (REGULAR | DOC, 0644, 'This is a test pkg 3\n'),
+ 'Changelog' : (REGULAR | DOC, 0644, 'Nothing new\n'),
+ 'fifos/' : {
+ 'pipe' : (FIFO, 0644, None)
+ },
+ 'devs/' : {
+ 'char12' : (CHARDEV, 0644, (1, 2)),
+ 'char34' : (CHARDEV, 0644, (3, 4)),
+ 'block56' : (BLOCKDEV, 0644, (5, 6)),
+ 'block78' : (BLOCKDEV, 0644, (7, 8))
+ }
+ }
+ },
+
+ 'pkg4' : {
+ 'usr/bin' : {
+ 'openlmi-sw-test-script' :
+ (REGULAR, 0755, '#!/bin/sh\necho "OpenLMI rules!"')
+ },
+ 'usr/share/openlmi-sw-test-pkg4/' : {
+ 'README' : (REGULAR | DOC, 0644, 'This is a test pkg 4\n'),
+ 'Changelog' : (REGULAR | DOC, 0644, 'Nothing new\n'),
+ 'perms/' : { # this will be given 777 perms with the sticky bit
+ 'rwxrwxrwx' : (REGULAR, 0777, ''),
+ 'rw-rw-rw-' : (REGULAR, 0666, ''),
+ 'r--r--r--' : (REGULAR, 0444, ''),
+ 'r-xr-xr-x' : (REGULAR, 0555, ''),
+ 'r---w---x' : (REGULAR, 0421, ''),
+ 'rwSr--r--' : (REGULAR, 04644, ''),
+ 'rw-r-Sr--' : (REGULAR, 02644, ''),
+ 'rw-r--r-T' : (REGULAR, 01644, ''),
+ 'rwsr-sr-t' : (REGULAR, 07755, ''),
+ }
+ },
+ }
+}
+
+#: Repositories with assigned packages.
+REPOS = {
+ 'openlmi-sw-test-repo-stable' : {
+ # grows by revision
+ 'openlmi-sw-test-pkg1-0:1.2.3-1%(disttag)s.noarch' : {
+ 'files' : PKG_FILE_ENTRIES['pkg1']
+ },
+ # grows by epoch
+ 'openlmi-sw-test-pkg2-1:1.2.3-1%(disttag)s.noarch' : {
+ 'files' : PKG_FILE_ENTRIES['pkg2']
+ },
+ # grows by version
+ 'openlmi-sw-test-pkg3-0:1.2.1-3%(disttag)s.noarch' : {
+ 'files' : PKG_FILE_ENTRIES['pkg3']
+ },
+ # no update available
+ 'openlmi-sw-test-pkg4-1:1.2.3-1%(disttag)s.noarch' : {
+ 'files' : PKG_FILE_ENTRIES['pkg4']
+ }
+ },
+
+ 'openlmi-sw-test-repo-updates' : {
+ 'openlmi-sw-test-pkg1-0:1.2.3-2%(disttag)s.noarch' : {
+ 'files' : PKG_FILE_ENTRIES['pkg1']
+ },
+ 'openlmi-sw-test-pkg2-2:1.2.2-1%(disttag)s.noarch' : {
+ 'files' : PKG_FILE_ENTRIES['pkg2']
+ },
+ 'openlmi-sw-test-pkg3-0:1.2.2-2%(disttag)s.noarch' : {
+ 'files' : PKG_FILE_ENTRIES['pkg3']
+ },
+ # exactly the same version as in repo stable
+ 'openlmi-sw-test-pkg4-1:1.2.3-1%(disttag)s.noarch' : {
+ 'files' : PKG_FILE_ENTRIES['pkg4']
+ }
+ },
+
+ 'openlmi-sw-test-repo-updates-testing' : {
+ 'openlmi-sw-test-pkg1-0:1.2.3-3%(disttag)s.noarch' : {
+ 'files' : PKG_FILE_ENTRIES['pkg1']
+ },
+ 'openlmi-sw-test-pkg2-3:1.2.0-1%(disttag)s.noarch' : {
+ 'files' : PKG_FILE_ENTRIES['pkg2']
+ },
+ },
+
+ 'openlmi-sw-test-repo-misc' : {
+ # this package conflicts with pkg1 (explicitely)
+ 'openlmi-sw-test-conflict1-0:1.2.3-1%(disttag)s.noarch' : {
+ 'conflicts' : ['openlmi-sw-test-pkg1']
+ },
+ # this package conflicts with pkg2 (by files)
+ 'openlmi-sw-test-conflict2-0:1.2.3-1%(disttag)s.noarch' : {
+ 'files' : PKG_FILE_ENTRIES['pkg1']
+ },
+ # this package depends on few packages above
+ 'openlmi-sw-test-depend1-0:1.2.3-1%(disttag)s.noarch' : {
+ 'requires' : ['openlmi-sw-test-pkg1', 'openlmi-sw-test-pkg2']
+ },
+ # same as previous one, with version requirements
+ 'openlmi-sw-test-depend2-0:1.2.3-1%(disttag)s.noarch' : {
+ 'requires' : [
+ 'openlmi-sw-test-pkg1 = 1.2.3-1%(disttag)s',
+ 'openlmi-sw-test-pkg2 = 2:1.2.2-1%(disttag)s']
+ },
+ # same package as pkg2 except for arch and shared directory
+ 'openlmi-sw-test-pkg1-0:1.2.3-1%(disttag)s.%(basearch)s' : {
+ 'files' : {
+ 'usr/share/openlmi-sw-test-pkg2-arch/' :
+ PKG_FILE_ENTRIES['pkg2'].values()[0]
+ }
+ },
+ # funny (but valid) version string
+ 'openlmi-sw-test-funny-version-0:.1bA.+~-1%(disttag)s.noarch' : {
+ },
+ # funny (but valid) release string
+ 'openlmi-sw-test-funny-release-0:1.2.3-+.rel%(disttag)s.noarch' : {
+ },
+ },
+}
+
+SPEC_TEMPLATE = """\
+Name: %(pkg_name)s
+Epoch: %(epoch)s
+Version: %(version)s
+Release: %(release)s
+License: LGPLv2+
+Source0: %(pkg_name)s-%(epoch)s:%(version)s-%(release)s.%(pkg_arch)s.tar.gz
+%(arch_string)s
+Requires: openlmi-software
+%(requires_string)s
+Summary: This is a test package %(pkg_name)s
+
+%%description
+Test package for OpenLMI Software provider named %(pkg_name)s.
+
+%%prep
+%%setup -q
+
+%%build
+# no build necessary
+
+%%install
+%(install_string)s
+
+%%files
+%(files_string)s
+
+%%changelog
+* Mon Nov 04 2013 Michal Minar <miminar@redhat.com> 0.4.0-2
+- initial commit
+"""
+
+REPO_CONFIG_TEMPLATE = """\
+[%(repoid)s]
+name=%(name)s
+baseurl=file://%(repos_dir)s/%(repoid)s
+gpgcheck=0
+enabled=1
+"""
+
+def full_repo_name(repoid):
+ """
+ Convenient function returning full repository name in case it's in
+ shortened form. Full repository name starts with ``openlmi-sw-test-repo-``.
+
+ :param string repoid: Repository id either in shortened or full form.
+ Must be present in ``REPOS``.
+ :returns: Full repository name.
+ :rtype: string
+ """
+ if not repoid in REPOS:
+ if not repoid.startswith('openlmi-sw-test-repo-'):
+ repoid = 'openlmi-sw-test-repo-' + repoid
+ if not repoid in REPOS:
+ raise ValueError('invalid repoid: %s' % repoid)
+ return repoid
+
+def full_pkg_name(pkg_name):
+ """
+ Convenient function returning full package name in case it's in
+ shortened form. Full package name starts with ``openlmi-sw-test-``.
+
+ :param string pkg_name: Package name either in shortened or full form.
+ Must be present in one of testing repositories.
+ :returns: full_pkg_name
+ :rtype: string
+ """
+ if not pkg_name.startswith('openlmi-sw-test-'):
+ pkg_name = 'openlmi-sw-test-' + pkg_name
+ return pkg_name
+
+def _unwind_files(path, files_dict):
+ """
+ Turns package files dictionary into a list of installed files.
+ Called recursively until all entries from *files_dict* are handled.
+
+ :param string path: Path where files are nested. If called the first time,
+ it should be ``'/'``.
+ :param dictionary files_dict: One of files dictionary from
+ ``PKG_FILE_ENTRIES`` or one with the same format.
+ :returns: List of absolute file paths of all package entries.
+ :rtype: list
+ """
+ result = []
+ for name, entry in files_dict.items():
+ full_path = os.path.join(path, name)
+ if isinstance(entry, dict):
+ if name.endswith('/'):
+ result.append(full_path[:-1])
+ result.extend(_unwind_files(full_path, entry))
+ else:
+ result.append(full_path)
+ return result
+
+def _make_package_object(repo, pkg_nevra, rpm_path, pkg_dict):
+ """
+ Create package object with supplied informations. RPM package needs
+ to be created first.
+
+ :param repo: Instance of repository.
+ :param string pkg_nevra: NEVRA of package to create.
+ :param string rpm_path: Absolute path to rpm file.
+ :param dictionary pkg_dict: One of ``REPOS`` values.
+ :returns: Package object.
+ :rtype: :py:class:`package.Package`
+ """
+ match = util.RE_NEVRA.match(pkg_nevra)
+ pkg = package.Package(match.group('name'),
+ epoch=match.group('epoch'),
+ ver=match.group('ver'),
+ rel=match.group('rel'),
+ arch=match.group('arch'),
+ repoid=repo.repoid,
+ rpm_path=rpm_path,
+ files=_unwind_files('/', pkg_dict.get('files', {})))
+ return pkg
+
+def _write_package_directory(path, entries=None):
+ """
+ Write contents of package to some directory. This content may then be
+ archived into a source tarball.
+
+ :param string path: Path to directory where the package contents shall
+ be written.
+ :param dictionary entries: File dictionary.
+ """
+ if entries is None:
+ entries = {}
+ os.makedirs(path)
+ if path.endswith('perms/'):
+ os.chmod(path, 0777 | stat.S_ISVTX)
+ for name, entry in entries.items():
+ full_path = os.path.join(path, name)
+ if isinstance(entry, dict):
+ _write_package_directory(full_path, entry)
+ elif isinstance(entry, tuple):
+ (ftype, perms, content) = entry
+ if ftype & REGULAR:
+ with open(full_path, 'w') as outf:
+ outf.write(content)
+ elif ftype & LINK:
+ os.symlink(content, full_path)
+ elif ftype & (CHARDEV | BLOCKDEV):
+ mode = ( (stat.S_IFCHR if ftype & CHARDEV else stat.S_IFBLK)
+ | perms)
+ os.mknod(full_path, mode, os.makedev(content[0], content[1]))
+ elif ftype & FIFO:
+ os.mkfifo(full_path, perms)
+ else:
+ raise ValueError('file type 0%o unknown for file "%s"'
+ % (ftype, full_path))
+ if not ftype & (CHARDEV | BLOCKDEV | FIFO | LINK):
+ os.chmod(full_path, perms)
+ else:
+ raise TypeError('enexpected entry in package dictionary: "%s"'
+ % repr(entry))
+
+def _make_source_tarball(rpmbuild_dir, pkg_nevra, pkg_dict):
+ """
+ Create a source tarball for rpm.
+
+ :param string rpmbuild_dir: Path to directory where ``rpmbuild`` shall
+ operate.
+ :param string pkg_nevra: NEVRA string of package to build.
+ :param dictionary pkg_dict: One of values of ``REPOS`` dictionary.
+ :returns: Absolute file path to created tarball.
+ :rtype: string
+ """
+ sources_dir = os.path.join(rpmbuild_dir, 'SOURCES')
+ if not os.path.exists(sources_dir):
+ os.mkdir(sources_dir)
+ match = util.RE_NEVRA.match(pkg_nevra)
+ tarball_path = os.path.join(sources_dir, pkg_nevra + '.tar.gz')
+ archive_dir = tempfile.mkdtemp()
+ try:
+ src_dir = os.path.join(archive_dir,
+ '%s-%s' % (match.group('name'), match.group('ver')))
+ _write_package_directory(src_dir, pkg_dict.get('files', {}))
+ subprocess.call(['/usr/bin/tar', '-C', archive_dir,
+ '-czf', tarball_path, os.path.basename(src_dir)])
+ return tarball_path
+ finally:
+ shutil.rmtree(archive_dir)
+
+def _make_spec_files_string(path, entries):
+ """
+ Make a string used in spec file template to define a list of files
+ belonging to package. Called recursively.
+
+ :param string path: Absolute path to an entry to be added.
+ :param dictionary entries: Dictionary representing directory content to
+ include in resulting list.
+ :returns: List of lines. Each representing one file or directory.
+ :rtype: list
+ """
+ flag_to_prefix = {
+ DOC : '%doc ',
+ CONFIG : '%config(noreplace) ',
+ 0 : ''
+ }
+ res = []
+ for name, entry in entries.items():
+ full_path = os.path.join(path, name)
+ if isinstance(entry, dict):
+ if name.endswith('perms/'):
+ res.append('%dir %attr(1777, -, -) ' + full_path)
+ elif name.endswith('/'):
+ res.append('%dir ' + full_path)
+ res.extend(_make_spec_files_string(full_path, entry))
+ elif isinstance(entry, tuple):
+ if REGULAR & entry[0]:
+ attrs = '%%attr(%o, -, -) ' % entry[1]
+ else:
+ attrs = ''
+ res.append(flag_to_prefix[entry[0] & (DOC | CONFIG)]
+ + attrs + full_path)
+ else:
+ raise TypeError('enexpected entry in package dictionary: "%s"'
+ % repr(entry))
+ return res
+
+def _get_rpm_name_from_spec(spec_file_path):
+ """
+ :param string spec_file_path: Absolute file path to spec file.
+ :returns: Name of rpm file builded out of particular spec file.
+ :rtype: string
+ """
+ return subprocess.check_output(['/usr/bin/rpm', '-q', '--specfile',
+ spec_file_path]).splitlines()[0] + '.rpm'
+
+def _build_pkg(rpmbuild_dir, pkg_nevra, pkg_dict):
+ """
+ Build RPM package.
+
+ :param string rpmbuild_dir: Absolute path to directory where
+ the build takes place.
+ :param string pkg_nevra: Nevra string of package to build.
+ :param dictionary pkg_dict: One of ``REPOS`` values.
+ :returns: Absolute file path to created rpm.
+ :rtype: string
+ """
+ config = get_yum_config()
+ specs_dir = os.path.join(rpmbuild_dir, 'SPECS')
+ build_dir = os.path.join(rpmbuild_dir, 'BUILD')
+ rpm_dir = os.path.join(rpmbuild_dir, 'RPMS')
+ if not os.path.exists(specs_dir):
+ os.mkdir(specs_dir)
+ if not os.path.exists(build_dir):
+ os.mkdir(build_dir)
+ spec_path = os.path.join(specs_dir, pkg_nevra + '.spec')
+ match = util.RE_NEVRA.match(pkg_nevra)
+ if not match:
+ raise ValueError('pkg_nevra is not a valid nevra: "%s"' % pkg_nevra)
+ with open(spec_path, 'w') as spec:
+ spec_args = {
+ "pkg_name" : match.group('name'),
+ "epoch" : match.group('epoch'),
+ "version" : match.group('ver'),
+ "release" : match.group('rel'),
+ "pkg_arch" : match.group('arch')
+ }
+ if match.group('arch') == 'noarch':
+ spec_args['arch_string'] = 'BuildArch: %s\n' % match.group('arch')
+ else:
+ spec_args['arch_string'] = ''
+ for deps in ('requires', 'conflicts'):
+ if deps in pkg_dict and pkg_dict[deps]:
+ spec_args[deps + '_string'] = "\n".join(
+ ('%s: %s' % (deps.capitalize(), r))
+ for r in pkg_dict[deps]) % config._asdict()
+ else:
+ spec_args[deps + '_string'] = ''
+ spec_args['files_string'] = "\n".join(
+ _make_spec_files_string('/', pkg_dict.get('files', {})))
+ if pkg_dict.get('files', []):
+ spec_args['install_string'] = 'cp -a ./* ${RPM_BUILD_ROOT}/'
+ else:
+ spec_args['install_string'] = ''
+ spec.write(SPEC_TEMPLATE % spec_args)
+
+ subprocess.call(['/usr/bin/rpmbuild', '--quiet',
+ '-ba', # build rpm package
+ '-D', '_topdir %s' % rpmbuild_dir, # build in rpmbuild_dir
+ spec_path # from this spec
+ ])
+ return os.path.join(rpm_dir, match.group('arch'),
+ _get_rpm_name_from_spec(spec_path))
+
+def _make_pkg(rpmbuild_dir, pkg_nevra, pkg_dict):
+ """
+ :returns: Absolute path to created rpm package.
+ :rtype: string
+ """
+ _make_source_tarball(rpmbuild_dir, pkg_nevra, pkg_dict)
+ return _build_pkg(rpmbuild_dir, pkg_nevra, pkg_dict)
+
+def _create_repo(repos_dir, repoid, packages):
+ """
+ :returns: Absolute path to repository configuration file.
+ :rtype: string
+ """
+ config_path = os.path.join('/etc/yum.repos.d', repoid + '.repo')
+ repo_dir = os.path.join(repos_dir, repoid)
+ with open(config_path, 'w') as config_file:
+ config_file.write(REPO_CONFIG_TEMPLATE % {
+ 'repoid' : repoid,
+ 'repos_dir' : repos_dir,
+ 'name' : 'OpenLMI Software Test Repository - ' + ' '.join(
+ r.capitalize()
+ for r in repoid[len('openlmi-sw-test-repo-'):].split('-'))
+ })
+ os.makedirs(repo_dir)
+ for rpm_path in packages:
+ shutil.move(rpm_path, repo_dir)
+ subprocess.call(['/usr/bin/createrepo', '--quiet', repo_dir])
+ return config_path
+
+def get_yum_config():
+ """
+ :returns: Yum configuration variables for this system.
+ :rtype: :py:class:`YumVars`
+ """
+ if not hasattr(get_yum_config, '_yum_vars'):
+ yumvar_dict = yum.YumBase().conf.yumvar
+ kwargs = { f: yumvar_dict[f]
+ for f in YumVars._fields if f in yumvar_dict}
+ kwargs['disttag'] = '.' + subprocess.check_output(
+ ['/usr/bin/rpm', '-q', '--qf', '%{RELEASE}\n'
+ , 'kernel']).splitlines()[0].split('.')[-1]
+ get_yum_config._yum_vars = YumVars(**kwargs)
+ return get_yum_config._yum_vars
+
+def make_repositories(repos_dir):
+ """
+ :returns: ``{ repoid : (config_path, rpms), ... }``
+ Where rpms is a list of pairs:
+ ``[ (pkg_nevra, rpm_path), ... ]``
+ :rtype: dictionary
+ """
+ rpmbuild_dir = tempfile.mkdtemp()
+ config = get_yum_config()
+ try:
+ # (repoid, (config_file_path, rpm_package_list))
+ result = {}
+ for repoid, packages in REPOS.items():
+ rpms = []
+ for pkg_tmpl, pkg_dict in packages.items():
+ pkg_nevra = pkg_tmpl % config._asdict()
+ rpms.append(
+ ( pkg_nevra
+ , _make_pkg(rpmbuild_dir, pkg_nevra, pkg_dict)))
+ result[repoid] = (
+ _create_repo(repos_dir, repoid, [r for _, r in rpms]),
+ rpms)
+ finally:
+ shutil.rmtree(rpmbuild_dir)
+ return result
+
+def make_object_database(repos_dir):
+ """
+ :returns: ``(test_repo_db, other_repo_db)`` where
+ both items have the following structure: ``{ repoid : repo, ... }``.
+ First item contains repositories that shall be used for testing.
+ Others are currently present on system and will be disabled during
+ tests.
+ :rtype: tuple
+ """
+ repos = make_repositories(repos_dir)
+ db = dict((r.repoid, r) for r in repository.get_repo_database())
+ result = {}
+ for repoid, (_config_path, rpms) in repos.items():
+ repo = db.pop(repoid)
+ for pkg_nevra, rpm_path in rpms:
+ pkg_name = util.RE_NEVRA.match(pkg_nevra).group('name')
+ pkg_dict = [ d for p, d in REPOS[repoid].items()
+ if p.startswith(pkg_name)][0]
+ pkg = _make_package_object(repo, pkg_nevra,
+ os.path.join(repo.local_path, os.path.basename(rpm_path)),
+ pkg_dict)
+ repo.packages.add(pkg)
+ result[repoid] = repo
+ return (result, db)
+
+class ObjectEncoder(json.JSONEncoder):
+ """
+ Takes care of encoding repository and package objects for json to handle
+ library to handle.
+ """
+ def default(self, obj):
+ if isinstance(obj, repository.Repository):
+ return repository.to_json(self, obj)
+ elif isinstance(obj, package.Package):
+ return package.to_json(self, obj)
+ elif isinstance(obj, datetime.datetime):
+ return int(time.mktime(obj.timetuple()))
+ elif isinstance(obj, set):
+ return list(obj)
+ return json.JSONEncoder.default(self, obj)
+
+def save_test_database(repos_dir, test_repo_db, other_repo_db, dest_path=None):
+ """
+ Writes package database into a file.
+
+ :param string repos_dir: Directory were repositories are stored.
+ :param dictionary test_repo_db: Dictionary with testing repositories.
+ It's the first item returned from :py:func:`make_object_database`.
+ :param dictionary other_repo_db: Dictionary with other repositories.
+ It's the second item returned from :py:func:`make_object_database`.
+ :param string dest_path: Absolute file path, where the given objects will be
+ serialized. If ``None``, some random, temporary file will be created.
+ :returns: File path of written data.
+ :rtype: string
+ """
+ if dest_path is None:
+ dest_path = tempfile.mkstemp()
+ db_file = os.fdopen(dest_path[0], 'w')
+ dest_path = dest_path[1]
+ else:
+ db_file = open(dest_path, 'w')
+ try:
+ data = (repos_dir, test_repo_db, other_repo_db)
+ json.dump(data, db_file, cls=ObjectEncoder,
+ sort_keys=True, indent=4, separators=(',', ': '))
+ finally:
+ db_file.close()
+ return dest_path
+
+def load_test_database(src_path):
+ """
+ This is inverse function to :py:func:`save_pkg_database`.
+
+ :returns: ``(repos_dir, test_repo_db, other_repo_db)`` package lists loaded
+ from file.
+ :rtype: tuple
+ """
+ with open(src_path, 'r') as db_file:
+ repos_dir, test_db, other_db = json.load(
+ db_file,
+ object_hook=functools.partial(
+ repository.from_json, package.from_json))
+ return repos_dir, test_db, other_db
+
+def main():
+ repos_dir = tempfile.mkdtemp()
+ print "created repos dir: %s" % repos_dir
+ db = make_object_database(repos_dir)
+ import pprint
+ pprint.pprint(db)
+
+if __name__ == '__main__':
+ main()
+
diff --git a/src/software/test/repository.py b/src/software/test/repository.py
index bb9b068..d963d43 100644
--- a/src/software/test/repository.py
+++ b/src/software/test/repository.py
@@ -23,6 +23,9 @@ Abstraction of YUM repository for test purposes.
from collections import defaultdict, namedtuple
from datetime import datetime
+import functools
+import inspect
+import json
import os
import re
from subprocess import call, check_output
@@ -40,23 +43,207 @@ RE_REPO_CONFIG = re.compile(
RE_REPO_ENABLED = re.compile(
r'^enabled\s*=\s*(true|false|0|1|yes|no)\s*$', re.I | re.M)
-Repository = namedtuple("Repository", #pylint: disable=C0103
- # repo properties
- [ "repoid"
- , "name"
- , "status"
- , "revision"
- , "tags"
- , "last_updated"
- , "pkg_count"
- , "base_urls"
- , "metalink"
- , "mirror_list"
- , "filename"
- , "cost"
- , "gpg_check"
- , "repo_gpg_check"
- ])
+class Repository(object):
+
+ def __init__(self, repoid, name, status, revision,
+ tags, last_updated, pkg_count, base_urls,
+ metalink, mirror_list, filename, cost, timeout,
+ gpg_check, repo_gpg_check, metadata_expire,
+ config_path=None, packages=None):
+ for attr in ('repoid', 'name', 'tags', 'metalink', 'filename'):
+ setattr(self, attr, locals()[attr])
+ for attr in ('gpg_check', 'repo_gpg_check', 'cost', 'status',
+ 'revision', 'last_updated', 'pkg_count',
+ 'base_urls', 'mirror_list', 'timeout', 'metadata_expire'):
+ setattr(self, '_'+attr, None)
+ self.base_urls = base_urls
+ self.cost = cost
+ self.gpg_check = gpg_check
+ self.last_updated = last_updated
+ self.metadata_expire = metadata_expire
+ self.mirror_list = mirror_list
+ self.pkg_count = pkg_count
+ self.repo_gpg_check = repo_gpg_check
+ self.revision = revision
+ self.status = status
+ self.timeout = timeout
+ if config_path is None:
+ repos_dir = os.environ.get('LMI_SOFTWARE_YUM_REPOS_DIR',
+ '/etc/yum.repos.d')
+ config_path = os.path.join(repos_dir, '%s.repo' % self.repoid)
+ self.config_path = config_path
+ if packages is None:
+ packages = set()
+ self._packages = set(packages)
+
+ @property
+ def base_urls(self):
+ return self._base_urls
+ @base_urls.setter
+ def base_urls(self, base_urls):
+ if not isinstance(base_urls, list):
+ raise TypeError('base_urls must be a list')
+ self._base_urls = base_urls
+
+ @property
+ def local_path(self):
+ if ( len(self._base_urls) == 1
+ and self._base_urls[0].startswith('file://')):
+ return self._base_urls[0][len('file://'):]
+
+ @property
+ def cost(self):
+ return self._cost
+ @cost.setter
+ def cost(self, cost):
+ self._cost = int(cost)
+
+ @property
+ def gpg_check(self):
+ return self._gpg_check
+ @gpg_check.setter
+ def gpg_check(self, value):
+ self._gpg_check = bool(value)
+
+ @property
+ def last_updated(self):
+ return self._last_updated
+ @last_updated.setter
+ def last_updated(self, last_updated):
+ if isinstance(last_updated, int):
+ last_updated = datetime.fromtimestamp(last_updated)
+ if last_updated is not None and not isinstance(last_updated, datetime):
+ raise TypeError('last_updated must be a datetime object')
+ self._last_updated = last_updated
+
+ @property
+ def metadata_expire(self):
+ return self._metadata_expire
+ @metadata_expire.setter
+ def metadata_expire(self, metadata_expire):
+ if not isinstance(metadata_expire, (int, long, basestring)):
+ raise TypeError('metadata_expire must be either integer or datetime')
+ self._metadata_expire = (
+ int(metadata_expire.strip().split(' ')[0])
+ if isinstance(metadata_expire, basestring)
+ else metadata_expire)
+
+ @property
+ def mirror_list(self):
+ return self._mirror_list
+ @mirror_list.setter
+ def mirror_list(self, mirror_list):
+ if mirror_list is not None and not isinstance(mirror_list, basestring):
+ raise TypeError('mirror list must be a string')
+ if not mirror_list:
+ mirror_list = None
+ self._mirror_list = mirror_list
+
+ @property
+ def packages(self):
+ return self._packages
+
+ @property
+ def pkg_count(self):
+ return self._pkg_count
+ @pkg_count.setter
+ def pkg_count(self, pkg_count):
+ self._pkg_count = None if pkg_count is None else int(pkg_count)
+
+ @property
+ def repo_gpg_check(self):
+ return self._repo_gpg_check
+ @repo_gpg_check.setter
+ def repo_gpg_check(self, value):
+ self._repo_gpg_check = bool(value)
+
+ @property
+ def revision(self):
+ return self._revision
+ @revision.setter
+ def revision(self, value):
+ self._revision = None if value is None else int(value)
+
+ @property
+ def status(self):
+ return self._status
+ @status.setter
+ def status(self, value):
+ self._status = bool(value)
+
+ @property
+ def timeout(self):
+ return self._timeout
+ @timeout.setter
+ def timeout(self, timeout):
+ self._timeout = float(timeout)
+
+ def __getitem__(self, pkg_name):
+ pkg_dict = {p.name : p for p in self._packages}
+ try:
+ return pkg_dict[pkg_name]
+ except KeyError:
+ try:
+ return pkg_dict['openlmi-sw-test-' + pkg_name]
+ except KeyError:
+ raise KeyError(pkg_name)
+
+ def refresh(self):
+ updated_attrs = set()
+ for attr, value in _parse_repo_file(self.repoid).items():
+ try:
+ setattr(self, attr, value)
+ updated_attrs.add(attr)
+ except AttributeError:
+ print 'can\'t set attribute "%s"' % attr
+ if not self.status:
+ for attr in ('revision', 'tags', 'last_updated'):
+ if attr not in updated_attrs and getattr(self, attr) is not None:
+ setattr(self, attr, None)
+
+def to_json(encoder, repo):
+ """
+ Converts repository object to dictionary which json encoder can handle.
+
+ :param encoder: Instance of json encoder.
+ :param repo: Package object to convert.
+ :type repo: :py:class:`Repository`
+ :returns: Dictionary with repository attributes.
+ :rtype: dictionary
+ """
+ if not isinstance(repo, Repository):
+ raise TypeError("not a repo object")
+ def transform_value(val):
+ if not isinstance(val, (int, long, float, tuple, list,
+ dict, bool, basestring)) and val is not None:
+ return encoder.default(val)
+ if isinstance(val, (list, tuple)):
+ return type(val)((transform_value(v) for v in val))
+ return val
+ argspec = inspect.getargspec(repo.__init__)
+ return { a: transform_value(getattr(repo, a)) for a in argspec.args[1:] }
+
+def from_json(package_deserializer, json_object):
+ """
+ Constructs a repository object from dictionary loaded from json text.
+ Inverse function to :py:func:`to_json`.
+
+ :param callable package_deserializer: Function taking encoder and package
+ dictionary as arguments, returning new package object.
+ :param dictionary json_object: Deserialized repository as a dictionary.
+ :returns: Package object.
+ :rtype: :py:class:`Package`
+ """
+ if 'gpg_check' in json_object:
+ kwargs = {}
+ for key, value in json_object.items():
+ if isinstance(value, (tuple, list)):
+ value = type(value)((package_deserializer(v) for v in value))
+ else:
+ value = package_deserializer(value)
+ kwargs[key] = value
+ return Repository(**kwargs)
+ return json_object
# example of repo information
#Repo-id : updates-testing/18/x86_64
@@ -73,9 +260,12 @@ Repository = namedtuple("Repository", #pylint: disable=C0103
def _parse_repo_file(repo_name):
"""
- Parse output of yum-config-manager command for single repository.
- @return dictionary with key-value pairs suitable for passing to Repository
- constructor.
+ Parse output of ``yum-config-manager`` command for single repository.
+
+ :param string repo_name: Repository id.
+ :returns: Dictionary with key-value pairs suitable for passing to
+ :py:class:`Repository` constructor.
+ :rtype: dictionary
"""
cmd = ["yum-config-manager", repo_name]
out = check_output(cmd).decode('utf-8')
@@ -93,17 +283,23 @@ def _parse_repo_file(repo_name):
result[tag] = int(value)
elif tag == "cost":
result[tag] = int(value)
- else:
+ elif tag == 'baseurl':
+ result['base_urls'] = [value]
+ elif tag == 'enabled':
+ result['status'] = value.lower() in ('true', 'yes', 1)
+ elif tag == 'mirrorlist':
+ result['mirror_list'] = value
+ elif tag in {p for p in Repository.__dict__ if not p.startswith('_')}:
continue
return result
-def make_repo(repo_info):
+def make_repo(repo_info, packages=None):
"""
Makes a Repository instance from string dumped by yum repoinfo command.
"""
metadata = defaultdict(lambda : None)
metadata["base_urls"] = []
- fields = set(Repository._fields)
+ argspec = inspect.getargspec(Repository.__init__)
for match in RE_REPO_TAG.finditer(repo_info):
tag = match.group('tag')
value = match.group('value')
@@ -116,7 +312,7 @@ def make_repo(repo_info):
, 'mirrors' : 'mirror_list'
}[tag]
except KeyError:
- if tag not in fields:
+ if tag not in argspec.args:
continue # unexpeted, or unwanted tag found
new_tag = tag
if new_tag == 'repoid':
@@ -135,26 +331,41 @@ def make_repo(repo_info):
metadata[new_tag] = int(value)
else:
metadata[new_tag] = value
+ if packages is None:
+ packages = set()
+ metadata['packages'] = packages
config_items = _parse_repo_file(metadata['repoid'])
- for field in Repository._fields:
+ for field in argspec.args[1:]: # leave out self argument
if not field in metadata:
metadata[field] = config_items.get(field, None)
- return Repository(**dict((f, metadata[f]) for f in Repository._fields))
+ return Repository(**metadata)
+
+def get_repo_list(kind='all'):
+ if kind.lower() not in ('all', 'enabled', 'disabled'):
+ raise ValueError('kind must be on of {"all", "enabled", "disabled"}')
+ cmd = ["yum", "-q", "repoinfo", "all"]
+ return list(
+ RE_REPO_TAG.search(m.group(1)).group('value').split('/')[0]
+ for m in RE_REPO_INFOS.finditer(
+ check_output(cmd).decode('utf-8')))
def get_repo_database():
"""
- @return list of Repository instances for all configured repositories.
+ :returns: List of repository objects for all configured repositories.
+ :rtype: list
"""
result = []
cmd = ["yum", "-q", "repoinfo", "all"]
- repo_infos = check_output(cmd, env={'LANG': 'C'}).decode('utf-8')
+ repo_infos = check_output(cmd).decode('utf-8')
for match in RE_REPO_INFOS.finditer(repo_infos):
result.append(make_repo(match.group(1)))
return result
def is_repo_enabled(repo):
"""
- @return True, if repository is enabled
+ :param repo: Either a repository id or instance of :py:class:`Repository`.
+ :returns: Whether the repository is enabled.
+ :rtype: boolean
"""
if isinstance(repo, Repository):
repo = repo.repoid
@@ -166,6 +377,9 @@ def is_repo_enabled(repo):
def set_repo_enabled(repo, enable):
"""
Enables or disables repository in its configuration file.
+
+ :param repo: Eiether a repository id or instance of :py:class:`Repository`.
+ :param boolean enable: New state of repository.
"""
if isinstance(repo, Repository):
repo = repo.repoid
diff --git a/src/software/test/rpmcache.py b/src/software/test/rpmcache.py
deleted file mode 100644
index b4be748..0000000
--- a/src/software/test/rpmcache.py
+++ /dev/null
@@ -1,541 +0,0 @@
-#!/usr/bin/python
-# -*- Coding:utf-8 -*-
-#
-# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details. #
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-#
-# Authors: Radek Novacek <rnovacek@redhat.com>
-# Authors: Michal Minar <miminar@redhat.com>
-
-"""
-Creation and manipulation utilities with rpm cache for software tests.
-"""
-
-import copy
-import os
-import json
-import random
-import re
-from collections import defaultdict
-from subprocess import call, check_output, CalledProcessError
-
-import util
-from package import Package, PackageEncoder, from_json
-
-DB_BACKUP_FILE = 'lmi_software_test_cache'
-
-RE_AVAIL_PKG = re.compile(
- r'^(?P<name>[^\s]+)\.(?P<arch>[a-zA-Z0-9_]+)'
- r'\s+((?P<epoch>[0-9]+):)?(?P<ver>[a-zA-Z0-9._+-]+)'
- r'-(?P<rel>[a-zA-Z0-9_.]+)\s+'
- r'(?P<repo>[a-zA-Z0-9_-]+)\s*$', re.MULTILINE)
-# this won't match the last entry, unless "package\n" is not appended
-# at the end of the string
-RE_PKG_DEPS = re.compile(
- r'^package:\s*(?P<name>[^\s]+)\.(?P<arch>[a-zA-Z0-9_]+)'
- r'\s+((?P<epoch>[0-9]+):)?(?P<ver>[a-zA-Z0-9._+-]+)'
- r'-(?P<rel>[a-zA-Z0-9_.]+)\s+(?P<dep_list>.*?)'
- r'(?=^package|\Z)', re.MULTILINE | re.DOTALL)
-RE_DEPS_PROVIDERS = re.compile(
- r'^\s*dependency:\s*(?P<dependency>.+?)\s*'
- r'(?P<providers>(^\s+unsatisfied\s+dependency$)|(^\s+provider:.+?$)+)',
- re.MULTILINE | re.IGNORECASE)
-RE_DEPS_UNSATISFIED = re.compile(r'^\s+unsatisfied\s+dep.*$', re.IGNORECASE)
-RE_PROVIDER = re.compile(
- r'^\s+provider:\s*(?P<name>[^\s]+)\.(?P<arch>[a-zA-Z0-9_]+)'
- r'\s+((?P<epoch>[0-9]+):)?(?P<ver>[a-zA-Z0-9._+-]+)'
- r'-(?P<rel>[a-zA-Z0-9_.]+)\s*$', re.IGNORECASE | re.MULTILINE)
-RE_PKG_INFO = re.compile(
- r'^Name\s*:\s*(?P<name>[^\s]+).*?'
- r'^(Epoch\s*:\s*(?P<epoch>[0-9]+)\s+)?'
- r'^Version\s*:\s*(?P<ver>[a-zA-Z0-9._+-]+)\s+'
- r'^Release\s*:\s*(?P<rel>[^\s]+)\s+.*?'
- r'^Size\s*:\s*(?P<size>\d+(\.\d+)?)( *(?P<units>[kMG]))?',
- re.MULTILINE | re.DOTALL | re.IGNORECASE)
-# matching packages won't be selected for dangerous tests
-RE_AVAILABLE_EXCLUDE = re.compile(
- r'^(kernel|tog-pegasus|sblim|.*openlmi).*')
-
-# Maximum number of packages, that will be selected for testing / 2
-# There are 2 sets of test packages (safe and dangerous). When running
-# in dangerous mode, the resulting number will be twice as much.
-MAX_PKG_DB_SIZE = 3
-# step used to iterate over package names used to check for thery dependencies
-# it's a number of packages, that will be passed to yum command at once
-PKG_DEPS_ITER_STEP = 50
-
-class InvalidTestCache(Exception):
- """Exception saying, that rpm test cache is not valiid."""
- pass
-class MissingRPM(InvalidTestCache):
- """
- Raised, when requested rpm for package is not contained in
- rpm test cache.
- """
- def __init__(self, pkg_name):
- InvalidTestCache.__init__(self,
- "Missing package '%s' in test cache!"%pkg_name)
-
-def _is_sane_package(pkg_name):
- """
- :returns: Whether the given package can be included among tested ones.
- :rtype: boolean
- """
- return RE_AVAILABLE_EXCLUDE.match(pkg_name) is None
-
-def _match_nevr(match):
- """
- @param match is a regexp match object with parsed rpm package
- @return tuple (name, epoch, version, release)
- """
- epoch = match.group('epoch')
- return ( match.group('name')
- , epoch if epoch and epoch.lower() != "(none)" else "0"
- , match.group('ver')
- , match.group('rel'))
-
-def _match2pkg(match, safe=False):
- """
- @param match is a regexp match object with attributes:
- name, epoch, version, release, arch
- @param safe if True, the packe will have up_* properties equal to
- non-prefixed ones, otherwise they will be set to None
- @return instance of Package
- """
- kwargs = {}
- if safe is True:
- kwargs['safe'] = True
- else:
- for attr in ('epoch', 'ver', 'rel', 'repo'):
- kwargs['up_'+attr] = None
- epoch = match.group('epoch')
- if not epoch or epoch.lower() == "(none)":
- epoch = '0'
- return Package(
- match.group('name'),
- epoch,
- match.group('ver'), match.group('rel'),
- match.group('arch'), match.groupdict().get('repo', None),
- **kwargs)
-
-def _filter_duplicates(installed, avail_str):
- """
- Parse output of "yum list available" command and retuns only those
- packages occuring in multiple versions.
- @param installed is a set of installed package names
- @param avail_str yum command output
- @return [ [pkg1v1, pkg1v2, ...], [pkg2v1, pkg2v2, ...], ... ]
- Each sublist of result contain at least 2 elements, that are instances
- of Package.
- """
- dups_list = []
- cur_package_matches = []
- prev_match = None
- system_arch = util.get_system_architecture()
- for match in RE_AVAIL_PKG.finditer(avail_str):
- if ( _match_nevr(match) in [ _match_nevr(m)
- for m in cur_package_matches]
- or ( ( not prev_match
- or prev_match.group('name') in
- [m.group('name') for m in cur_package_matches])
- and match.group('arch') not in ('noarch', system_arch))
- or not _is_sane_package(match.group('name'))):
- continue
- if prev_match and prev_match.group('name') != match.group('name'):
- if ( len(cur_package_matches) > 1
- and not cur_package_matches[0].group('name') in installed):
- pkgs = [ _match2pkg(m) for m in cur_package_matches ]
- dups_list.append(pkgs)
- cur_package_matches = []
- cur_package_matches.append(match)
- prev_match = match
- if len(cur_package_matches) > 1:
- dups_list.append([ _match2pkg(m) for m in cur_package_matches ])
- return dups_list
-
-def _check_single_pkg_deps(
- installed,
- dependencies_str):
- """
- Each package has zero or more dependencies. Each dependency
- has at least one provider. One of these provider must be installed
- for each such dependency.
- @param dependencies of single package
- @return True if all dependencies have at least one provider installed
- """
- for match_deps in RE_DEPS_PROVIDERS.finditer(dependencies_str):
- providers = []
- if RE_DEPS_UNSATISFIED.search(match_deps.group('providers')):
- return False
- for match_dep in RE_PROVIDER.finditer(match_deps.group('providers')):
- if match_dep.group('name') not in installed:
- continue
- providers.append(_match2pkg(match_dep))
- for provider in providers:
- if is_pkg_installed(provider, False):
- break
- else: # no provider is installed
- return False
- return True
-
-def _check_pkg_dependencies(
- installed,
- dup_list,
- number_of_packages=MAX_PKG_DB_SIZE,
- repolist=None):
- """
- Finds packages from dup_list with satisfied (installed) dependencies.
- @param installed is a set of installed package names
- @return filtered dup_list with at least number_of_packages elements.
- """
- yum_params = ['deplist']
- dups_no_deps = []
- for i in range(0, len(dup_list), PKG_DEPS_ITER_STEP):
- dups_part = dup_list[i:i+PKG_DEPS_ITER_STEP]
- yum_params = yum_params[:1]
- for dups in dups_part:
- yum_params.extend([d.get_nevra(newer=False) for d in dups])
- deplist_str = util.run_yum(*yum_params, repolist=repolist)
- matches = RE_PKG_DEPS.finditer(deplist_str)
- prev_match = None
- for pkgs in dups_part:
- satisfied = True
- remains = len(pkgs)
- for match_pkg in matches:
- if ( prev_match
- and _match_nevr(prev_match) == _match_nevr(match_pkg)):
- # there sometimes appear duplicates in yum deplist
- # output, so let's skip them
- continue
- if satisfied and not _check_single_pkg_deps(
- installed, match_pkg.group('dep_list')):
- satisfied = False
- prev_match = match_pkg
- remains -= 1
- if remains <= 0:
- break
- if satisfied:
- # all packages from pkgs have satisfied dependencies
- dups_no_deps.append(pkgs)
- if len(dups_no_deps) >= number_of_packages:
- break
- return dups_no_deps
-
-def _sorted_db_by_size(pkgdb, repolist=None):
- """
- @param pkgdb is a list of lists of packages with common name
- @return sorted instances of Package according to their size
- """
- yum_params = ['info', '--showduplicates']
- yum_params.extend([ps[0].name for ps in pkgdb])
- info_str = util.run_yum(*yum_params, repolist=repolist)
- pkg_sizes = {}
- # to get correct ordering from "yum info" command
- # { pkg_name : [(epoch, version, release), ... ] }
- pkg_version_order = defaultdict(list)
- try:
- header = "Available Packages\n"
- info_str = info_str[info_str.index(header)+len(header):]
- except ValueError:
- pass
- for info_match in RE_PKG_INFO.finditer(info_str):
- pkg_name = info_match.group('name')
- size = float(info_match.group('size'))
- units = info_match.group('units')
- if units:
- size *= defaultdict(lambda: 1,
- {'k':10**3, 'm':10**6, 'g':10**9})[units.lower()]
- pkg_sizes[pkg_name] = size
- epoch = info_match.group('epoch')
- if not epoch:
- epoch = "0"
- pkg_version_order[pkg_name].append((
- epoch, info_match.group('ver'), info_match.group('rel')))
- pkgdb = sorted(pkgdb, key=lambda pkgs: pkg_sizes[pkgs[0].name])[
- :MAX_PKG_DB_SIZE]
-
- for i, pkgs in enumerate(pkgdb):
- pkgs = sorted(pkgs, key=lambda p:
- pkg_version_order[pkgs[0].name].index((p.epoch, p.ver, p.rel)))
- pkg_kwargs = dict((k, getattr(pkgs[0], k)) for k in ('name', 'arch') )
- for attr in ('epoch', 'ver', 'rel', 'repo'):
- pkg_kwargs[attr] = getattr(pkgs[0], attr)
- pkg_kwargs['up_'+attr] = getattr(pkgs[-1], attr)
- pkgdb[i] = Package(**pkg_kwargs)
- return pkgdb
-
-def _download_dangerous(repolist, pkgdb, cache_dir=None):
- """
- Downloads all rpm packages (old and newer versions) from package database
- to current directory.
- """
- repo_pkgs = defaultdict(list)
- for pkg in pkgdb:
- repo_pkgs[pkg.repo].append(pkg.name)
- repo_pkgs[pkg.up_repo].append(pkg.name)
- base_cmd = ['yumdownloader']
- if cache_dir:
- base_cmd.extend(['--destdir', cache_dir])
- for repo, pkgs in repo_pkgs.items():
- cmd = copy.copy(base_cmd)
- repos = set(repolist)
- try:
- repos.remove(repo)
- except KeyError: # there may be duplicates
- continue
- for not_allowed_repo in repos:
- cmd.append('--disablerepo='+not_allowed_repo)
- cmd.append('--enablerepo='+repo)
- cmd.extend(pkgs)
- call(cmd)
-
-def _make_rpm_path(pkg, cache_dir='', newer=True, without_epoch=False):
- """
- @param newer says, whether to use EVR of package to update
- (in this case, all epoch/ver/rel attributes will be prefixed with "up_")
- @param without_epoch if True, epoch will be left out of package name
- @return path to rpm package made from instance of Package
- """
- if not isinstance(pkg, Package):
- raise TypeError("pkg must be an instance of Package ")
- nevra = pkg.get_nevra(newer,
- with_epoch='NEVER' if without_epoch else 'NOT_ZERO')
- return os.path.join(cache_dir, nevra) + '.rpm'
-
-def get_rpm_name(pkg, cache_dir='', newer=True):
- """
- Some packages do not have epoch in their name, even if it's higher than
- zero. That's why it's necessary to try more variants of rpm name.
- @return rpm path to package in cache
- """
- path = _make_rpm_path(pkg, cache_dir, newer)
- if os.path.exists(path):
- return path
- path = _make_rpm_path(pkg, cache_dir, newer, True)
- if os.path.exists(path):
- return path
- raise MissingRPM(pkg.name)
-
-def rpm_exists(pkg, cache_dir='', newer=True):
- """
- @return True, when rpm package is in cache.
- """
- return ( os.path.exists(_make_rpm_path(pkg, cache_dir, newer))
- or os.path.exists(_make_rpm_path(pkg, cache_dir, newer, True)))
-
-
-def remove_pkg(pkg, *args):
- """
- Remove package with rpm command.
- @param pkg is either instance of Package or package name
- @param args is a list of parameters for rpm command
- """
- if isinstance(pkg, Package):
- pkg = pkg.name
- call(["rpm", "--quiet"] + list(args) + ["-e", pkg])
-
-def install_pkg(pkg, newer=True, repolist=None):
- """
- Install a specific package.
- @param pkg is either package name or instance of Package
- In latter case, a specific version is installed.
- @param repolist is a list of repositories, that should be
- used for downloading, if using yum
- when empty, all enabled repositories are used
- """
- if isinstance(pkg, Package):
- try:
- rpm_name = get_rpm_name(pkg, newer=newer)
- call(["rpm", "--quiet", "-i", rpm_name])
- return
- except MissingRPM:
- pass
- pkg = pkg.name
- util.run_yum('-q', '-y', 'install', pkg, repolist=repolist)
-
-def ensure_pkg_installed(pkg, newer=True, repolist=None):
- """
- Ensures, that specific version of package is installed. If other
- version is installed, it is removed and reinstalled.
- """
- if not isinstance(pkg, Package):
- raise TypeError("pkg must be a Package instance")
- if not is_pkg_installed(pkg, newer):
- if is_pkg_installed(pkg.name):
- remove_pkg(pkg.name)
- install_pkg(pkg, newer, repolist)
-
-def verify_pkg(pkg):
- """
- @return output of command rpm, with verification output for package
- """
- if isinstance(pkg, basestring):
- name = pkg
- elif isinstance(pkg, Package):
- name = pkg.name
- else:
- raise TypeError("pkg must be either package name or Package instance")
- return call(["rpm", "--quiet", "-Va", name]) == 0
-
-def has_pkg_config_file(pkg, file_path):
- """
- @return True, if file_path is a configuration file of package pkg.
- """
- cmd = ['rpm', '-qc', pkg.name]
- out = check_output(cmd)
- return file_path in set(out.splitlines()) #pylint: disable=E1103
-
-def has_pkg_doc_file(pkg, file_path):
- """
- @return True, if file_path is a documentation file of package pkg.
- """
- cmd = ['rpm', '-qd', pkg.name]
- out = check_output(cmd)
- return file_path in set(out.splitlines()) #pylint: disable=E1103
-
-def is_pkg_installed(pkg, newer=True):
- """
- Check, whether package is installed.
- """
- if not isinstance(pkg, Package):
- return call(["rpm", "--quiet", "-q", pkg]) == 0
- else:
- cmd = [ "rpm", "-q", "--qf", "%{EPOCH}:%{NVRA}\n", pkg.get_nevra(
- newer, with_epoch='NEVER') ]
- try:
- out = check_output(cmd).splitlines()[0]
- epoch, nvra = out.split(':') #pylint: disable=E1103
- if not epoch or epoch.lower() == "(none)":
- epoch = "0"
- return ( epoch == getattr(pkg, 'up_epoch' if newer else 'epoch')
- and nvra == pkg.get_nevra(newer=newer, with_epoch="NEVER"))
- except CalledProcessError:
- return False
-
-def write_pkgdb(safe_pkgs, dangerous_pkgs, cache_dir=''):
- """
- Writes package database into a file named DB_BACKUP_FILE.
- """
- with open(os.path.join(cache_dir, DB_BACKUP_FILE), 'w') as db_file:
- data = (safe_pkgs, dangerous_pkgs)
- json.dump(data, db_file, cls=PackageEncoder,
- sort_keys=True, indent=4, separators=(',', ': '))
-
-def load_pkgdb(cache_dir=''):
- """
- This is inverse function to _write_pkgdb().
- @return (safe, dangerous) package lists loaded from file
- """
- with open(os.path.join(cache_dir, DB_BACKUP_FILE), 'r') as db_file:
- safe, dangerous = json.load(db_file, object_hook=from_json)
- #print "Loaded package database from: %s" % date_time
- return safe, dangerous
-
-def make_dangerous_list(installed, repolist=None):
- """
- This makes a list of instances of Package for dangerous tests.
- """
- avail_str = util.run_yum('list', 'available', '--showduplicates',
- repolist=repolist)
- # list of lists of packages with the same name, longer than 2
- dups_list = _filter_duplicates(installed, avail_str)
- selected = _check_pkg_dependencies(installed, dups_list,
- number_of_packages=MAX_PKG_DB_SIZE*5,
- repolist=repolist)
- return _sorted_db_by_size(selected, repolist=repolist)
-
-def make_safe_list(installed, exclude=None):
- """
- Makes list of installed packages for non-dangerous tests.
- @param installed is a list of installed packages names
- @param exclude is a list of package names, that won't appear in result
- """
- if exclude is None:
- exclude = set()
- base = list(installed)
- random.shuffle(base)
- res = []
- i = 0
- while len(res) < MAX_PKG_DB_SIZE and i < len(base):
- name = base[i]
- i += 1
- if name in exclude or not _is_sane_package(name):
- continue
- cmd = [ "rpm", "-q", "--qf", "%{EPOCH}:%{NVRA}\n", name ]
- envra = check_output(cmd).splitlines()[0]
- res.append(_match2pkg(util.RE_ENVRA.match(envra), safe=True))
- return res
-
-def get_pkg_database(
- force_update=False,
- use_cache=True,
- dangerous=False,
- cache_dir='',
- repolist=None):
- """
- Checks yum database for available packages, that have at least two
- different versions in repositories. Only not installed ones with
- all of their dependencies intalled are selected.
- And from those, few of the smallest are downloaded as rpms.
- @param dangerous whether to load available, not installed
- packages for dangerous tests
- @return (safe, dangerous)
- where
- safe is a list of instances of Package, representing installed
- software, these should be used for not-dangerous tests;
- both older
- dangerous is a list of instances of Package of selected packages,
- that are not installed, but available; instances contain
- both newer and older version of package
- """
- dangerous_pkgs = []
- if ( use_cache and not force_update
- and os.path.exists(os.path.join(cache_dir, DB_BACKUP_FILE))):
- safe_pkgs, dangerous_pkgs = load_pkgdb(cache_dir)
- valid_db = True
- if len(safe_pkgs) < MAX_PKG_DB_SIZE:
- valid_db = False
- elif not dangerous and len(dangerous_pkgs) > 0:
- dangerous_pkgs = []
- elif dangerous and len(dangerous_pkgs) == 0:
- valid_db = False
- else:
- for pkg in safe_pkgs:
- if not is_pkg_installed(pkg):
- valid_db = False
- break
- for pkg in dangerous_pkgs:
- if ( not rpm_exists(pkg, cache_dir, False)
- or not rpm_exists(pkg, cache_dir, True)):
- valid_db = False
- #print "Old package database is not valid"
- break
- if valid_db:
- return (safe_pkgs, dangerous_pkgs)
- #print "Getting installed packages"
- installed = set(check_output( #pylint: disable=E1103
- ['rpm', '-qa', '--qf=%{NAME}\n']).splitlines())
- if dangerous:
- dangerous_pkgs = make_dangerous_list(installed)
- safe_pkgs = make_safe_list(installed, exclude=set(
- pkg.name for pkg in dangerous_pkgs))
-
- if use_cache:
- repolist = util.get_repo_list() if repolist in (None, []) else repolist
- _download_dangerous(repolist, dangerous_pkgs, cache_dir)
- #print "Backing up database information"
- write_pkgdb(safe_pkgs, dangerous_pkgs, cache_dir)
- return (safe_pkgs, dangerous_pkgs)
-
diff --git a/src/software/test/run.py b/src/software/test/run.py
index f673d8a..4dfdf4c 100755
--- a/src/software/test/run.py
+++ b/src/software/test/run.py
@@ -23,8 +23,8 @@ Runs all defined tests.
Test modules are expected to be in directory with this script and
should contain subclasses of unittest.TestCase, that will be loaded.
-Preferably a suite() function should be defined there as well.
-They must be named according to shell regexp: "test_*.py".
+Preferably a ``suite()`` function should be defined there as well.
+They must be named according to shell regexp: ``"test_*.py"``.
"""
import argparse
@@ -33,16 +33,12 @@ import getpass
import inspect
import pywbem
import os
-import shutil
import sys
import tempfile
import unittest
-import rpmcache
-
-# this is a global variable, that can be modified by environment or program
-# argument
-CACHE_DIR = ''
+import reposetup
+import swbase
class NotFound(Exception):
"""Raised when test of particular name could no be found."""
@@ -52,7 +48,9 @@ class NotFound(Exception):
def parse_cmd_line():
"""
Use ArgumentParser to parse command line arguments.
- @return (parsed arguments object, arguments for unittest.main())
+
+ :returns: ``(parsed arguments object, arguments for unittest.main())``.
+ :rtype: tuple
"""
parser = argparse.ArgumentParser(
add_help=False, # handle help message ourselves
@@ -90,37 +88,22 @@ def parse_cmd_line():
default=os.environ.get('LMI_RUN_TEDIOUS', '0') == '1',
help="Disable tedious tests.")
- cache_group = parser.add_mutually_exclusive_group()
- cache_group.add_argument("-c", "--use-cache", action="store_true",
- default=(os.environ.get('LMI_SOFTWARE_USE_CACHE', '0') == '1'),
- help="Use a cache directory to download rpm packages for"
- " testing purposes. It greatly speeds up testing."
- " Also a database file \"${LMI_SOFTWARE_CACHE_DIR}/%s\""
- " will be created to store information"
- " for this system. Overrides environment variable"
- " LMI_SOFTWARE_USE_CACHE." % rpmcache.DB_BACKUP_FILE)
- cache_group.add_argument("--no-cache", action="store_false",
- dest="use_cache",
- default=(os.environ.get('LMI_SOFTWARE_USE_CACHE', '0') == '1'),
- help="Do not cache rpm packages for speeding up tests."
- " Overrides environment variable LMI_SOFTWARE_USE_CACHE.")
- parser.add_argument('--cache-dir',
- default=os.environ.get('LMI_SOFTWARE_CACHE_DIR', ''),
- help="Use particular directory, instead of temporary one, to store"
- " rpm packages for testing purposes. Overrides environment"
- " variable LMI_SOFTWARE_CACHE_DIR.")
- parser.add_argument('--test-repos',
- default=os.environ.get('LMI_SOFTWARE_REPOSITORIES', ''),
- help="Use this option to specify list of repositories, that"
- " alone should be used for testing. Overrides environment"
- " variable LMI_SOFTWARE_REPOSITORIES.")
- parser.add_argument('--test-packages',
- default=os.environ.get('LMI_SOFTWARE_PACKAGES', ''),
- help="Specify packages for dangerous tests. If empty"
- " and cache is enabled, some will be picked up by algorithm")
- parser.add_argument('--force-update', action="store_true",
- help="Force update of package database. Otherwise an old"
- " one will be used (if any exists).")
+ cleanup_group = parser.add_mutually_exclusive_group()
+ cleanup_group.add_argument("--cleanup-cache", action="store_true",
+ default=os.environ.get('LMI_SOFTWARE_CLEANUP_CACHE', '1') == '1',
+ help="Clean up all temporary files created for testing purposes."
+ " If LMI_SOFTWARE_DB_CACHE is set and this is set to False, next"
+ " run will be much faster because testing database won't need to"
+ " be generated again. Overrides environment variable"
+ " LMI_SOFTWARE_CLEANUP_CACHE.")
+ cleanup_group.add_argument('--no-cleanup', action="store_false",
+ dest="cleanup_cache",
+ default=os.environ.get('LMI_SOFTWARE_CLEANUP_CACHE', '1') == '1',
+ help="Do not delete database cache file and created repositories."
+ " This speeds up testing because testing repositories don't need"
+ " to be recreated upon next run. This applies only when"
+ " LMI_SOFTWARE_DB_CACHE is set.")
+
parser.add_argument('-l', '--list-tests', action="store_true",
help="List all possible test names.")
parser.add_argument('-h', '--help', action="store_true",
@@ -147,7 +130,9 @@ def try_connection(args):
"""
Try to connect to cim broker. If authentication fails, ask
the user for credentials in loop.
- @return (user, password)
+
+ :returns: ``(user, password)``
+ :rtype: tuple
"""
user = args.user
password = args.password
@@ -166,6 +151,9 @@ def try_connection(args):
def prepare_environment(args):
"""
Set the environment for test scripts.
+
+ :returns: Whether the database cache needs to be deleted.
+ :rtype: boolean
"""
os.environ['LMI_CIMOM_URL'] = args.url
os.environ['LMI_CIMOM_USERNAME'] = args.user
@@ -174,16 +162,23 @@ def prepare_environment(args):
'1' if args.run_dangerous else '0')
os.environ["LMI_RUN_TEDIOUS"] = (
'1' if args.run_tedious else '0')
- os.environ['LMI_SOFTWARE_USE_CACHE'] = '1' if args.use_cache else '0'
- if args.use_cache:
- os.environ['LMI_SOFTWARE_CACHE_DIR'] = CACHE_DIR
- os.environ['LMI_SOFTWARE_REPOSITORIES'] = args.test_repos
- os.environ['LMI_SOFTWARE_PACKAGES'] = args.test_packages
+ db_cache = os.environ.get("LMI_SOFTWARE_DB_CACHE", None)
+ needs_cleanup = args.cleanup_cache or not bool(db_cache)
+ if not db_cache or not os.path.exists(db_cache):
+ repos_dir = tempfile.mkdtemp()
+ repodb, other_repos = reposetup.make_object_database(repos_dir)
+ db_cache = reposetup.save_test_database(
+ repos_dir, repodb, other_repos, db_cache)
+ os.environ["LMI_SOFTWARE_DB_CACHE"] = db_cache
+ os.environ['LMI_SOFTWARE_CLEANUP_CACHE'] = (
+ '1' if needs_cleanup else '0')
+ return needs_cleanup
def load_tests(loader, standard_tests, pattern):
"""
- Helper function for unittest.main() test loader.
- @return TestSuite instance
+ Helper function for ``unittest.main()`` test loader.
+
+ :returns: :py:class:`unittest.TestSuite` instance.
"""
this_dir = os.path.dirname(__file__)
if standard_tests:
@@ -206,10 +201,12 @@ class LMITestLoader(unittest.TestLoader):
def find_in_test_nodes(node, name):
"""
Traverses suite tree nodes to find a test named name.
- @param name is a name of test to find
- @return desired TestCase or test function
+
+ :param string name: Name of test to find.
+ :returns: Desired :py:class:`TestCase` or test function.
"""
if ( isinstance(node, unittest.TestSuite)
+ and node.countTestCases() > 0
and isinstance(next(iter(node)), unittest.TestCase)
and next(iter(node)).__class__.__name__ == name):
return node
@@ -242,8 +239,10 @@ class LMITestLoader(unittest.TestLoader):
def unwind_test_suite_tree(node):
"""
Make a list of test names out of TestSuite.
- @param node is a suite
- @return [ test_name, ... ]
+
+ :param node: A test suite.
+ :type node: :py:class:`unittest.TestSuite`
+ :returns: ``[ test_name, ... ]``
"""
result = []
for subnode in node:
@@ -269,32 +268,17 @@ def main():
"""
Main functionality of script.
"""
- global CACHE_DIR
args, ut_args = parse_cmd_line()
if args.list_tests:
list_tests()
- if args.use_cache and not args.cache_dir:
- CACHE_DIR = tempfile.mkdtemp(suffix="software_database")
- elif args.use_cache:
- CACHE_DIR = args.cache_dir
- if not os.path.exists(args.cache_dir):
- os.makedirs(args.cache_dir)
try_connection(args)
- if args.test_repos:
- repolist = args.test_repos.split(',')
- else:
- repolist = []
- rpmcache.get_pkg_database(
- args.force_update,
- args.use_cache,
- dangerous=args.run_dangerous,
- cache_dir=CACHE_DIR,
- repolist=repolist)
- prepare_environment(args)
- test_program = unittest.main(__name__, argv=ut_args,
- testLoader=LMITestLoader(), exit=False)
- if args.use_cache and not args.cache_dir:
- shutil.rmtree(CACHE_DIR)
+ cleanup = prepare_environment(args)
+ swbase.SwTestCase.setUpClass()
+ try:
+ test_program = unittest.main(__name__, argv=ut_args,
+ testLoader=LMITestLoader(), exit=False)
+ finally:
+ swbase.SwTestCase.tearDownClass()
sys.exit(0 if test_program.result.wasSuccessful() else 1)
if __name__ == '__main__':
diff --git a/src/software/test/swbase.py b/src/software/test/swbase.py
new file mode 100644
index 0000000..ccf2b78
--- /dev/null
+++ b/src/software/test/swbase.py
@@ -0,0 +1,288 @@
+# -*- encoding: utf-8 -*-
+# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Michal Minar <miminar@redhat.com>
+#
+"""
+Common utilities and base class for all software tests.
+"""
+
+import itertools
+import functools
+import os
+import re
+import shutil
+import subprocess
+import tempfile
+
+from lmi.test.lmibase import LmiTestCase
+import package
+import repository
+import reposetup
+
+RE_REPOPKG = re.compile(r'^(?P<repo>[a-z-]+)#(?P<pkg_name>\w+.*|\*)', re.I)
+
+def test_with_repos(*enable_repos, **repos_dict):
+ """
+ Decorator factory for test case methods. It enables or disables
+ specified repositories for the time of method execution.
+
+ It accepts shortened form of test repositories e.g. *stable* instead of
+ *openlmi-sw-test-repo-stable*.
+
+ Usage: ::
+
+ @test_with_repos('stable', **{'updates' : False})
+ def test_method(self):
+ pass
+
+ Repository names that shall be enabled can be passed as string arguments.
+ Those that shall be disabled need to be passed as keyword arguments with
+ ``False`` assigned to them.
+ """
+
+ def _decorator(func):
+ """ Returns wrapped function. """
+ @functools.wraps(func)
+ def _wrapper(self, *args, **kwargs):
+ """ This is a wrapper taking care of repository enablement. """
+ to_enable = set(reposetup.full_repo_name(r) for r in enable_repos)
+ to_enable.update(
+ set(reposetup.full_repo_name(r)
+ for r, v in repos_dict.items() if v))
+ for repoid in self.repodb:
+ enable = ( repoid in to_enable
+ or bool(repos_dict.get(repoid, False)))
+ repo = self.repodb[repoid]
+ repo.refresh()
+ if enable != repo.status:
+ repository.set_repo_enabled(repo, enable)
+ repo.refresh()
+ return func(self, *args, **kwargs)
+
+ return _wrapper
+
+ return _decorator
+
+def test_with_packages(*install_pkgs, **enable_dict):
+ """
+ Decorator factory for test case methods. Returned decorator takes care of
+ installation or removal of specified packages for the time of method's
+ execution.
+
+ Packages needs to be selected from testing database. They are specified in
+ form: ::
+
+ repository#package
+
+ Where *repository* marks its full or shortened name. The same applies to
+ packages ( *pkg1* instead of *openlmi-sw-test-pkg1* can be written).
+
+ Star ``*`` is also accepted in place of package name marking any package
+ present in particular repository.
+
+ When supplied as a key-value pairs, value denote whether package (key)
+ shall be installed or removed. Packages to be removed may also omit
+ repository part.
+
+ Usage: ::
+
+ @test_with_packages('stable#pkg1', **{
+ 'updates#pkg2' : True,
+ 'pkg3' : False
+ })
+ def test_method(self):
+ pass
+
+ Example above shows two possible ways of specifying packages to install.
+ Those are:
+ * openlmi-sw-test-pkg1 from stable repository
+ * openlmi-sw-test-pkg2 from updates repository
+ Package openlmi-sw-test-pkg3 will be removed regardless of the source
+ repository.
+ """
+
+ def _decorator(func):
+ @functools.wraps(func)
+ def _wrapper(self, *args, **kwargs):
+ to_install = []
+ to_remove = set()
+ for repopkg in itertools.chain(install_pkgs, enable_dict.keys()):
+ match = RE_REPOPKG.match(repopkg)
+ if match:
+ repo = self.repodb[
+ reposetup.full_repo_name(match.group('repo'))]
+ pkg_name = match.group('pkg_name')
+ if pkg_name == '*':
+ pkgs = list(repo.packages)
+ else:
+ pkgs = [repo[reposetup.full_pkg_name(pkg_name)]]
+ elif enable_dict.get(repopkg, True) is False:
+ # package is given as string (with just name)
+ repopkg = reposetup.full_pkg_name(repopkg)
+ if package.is_pkg_installed(repopkg):
+ present = []
+ # remove any instances of Package with the same name
+ # from *to_remove* set
+ for pkg in to_remove:
+ if ( isinstance(pkg, package.Package)
+ and pkg.name == repopkg):
+ present.append(pkg)
+ for pkg in present:
+ to_remove.remove(pkg)
+ # now let's add the pkg name
+ to_remove.add(repopkg)
+ continue
+ else:
+ raise ValueError(
+ 'invalid format of repo#package string: "%s"'
+ % repopkg)
+ for pkg in pkgs:
+ installed = package.is_pkg_installed(pkg)
+ if installed and not enable_dict.get(repopkg, True):
+ if pkg.name not in to_remove:
+ to_remove.add(pkg)
+ elif not installed and enable_dict.get(repopkg, True):
+ to_install.append(pkg)
+ for pkg in to_remove:
+ package.remove_pkg(pkg)
+ for pkg in to_install:
+ if package.is_pkg_installed(pkg.name):
+ package.remove_pkg(pkg.name)
+ package.install_pkg(pkg)
+ return func(self, *args, **kwargs)
+
+ return _wrapper
+
+ return _decorator
+
+class SwTestCase(LmiTestCase):
+ """
+ Base class for all LMI Software test classes.
+
+ There are few important class properties to note:
+
+ ``repodb`` : dictionary
+ Is a dictionary of testing repositories. These are created upon
+ set up of this class. It has following format: ::
+
+ ( repoid, repository)
+
+ Where repository is an instance of
+ ;py:class:`repository.Repository`.
+ ``other_repos`` : dictionary
+ Has the same format but contains other repositories not present in
+ ``repodb``. These are disabled upon class's set up and re-enabled
+ on its tear down.
+
+ These additional environment variables affects the execution:
+
+ ``LMI_SOFTWARE_YUM_REPOS_DIR``
+ Absolute path to directory used to store yum repository
+ configuration files.
+ ``LMI_SOFTWARE_DB_CACHE``
+ File path to a serialized testing database which was cached in
+ previous runs.
+ ``LMI_SOFTWARE_CLEANUP_CACHE`` : defaults to '1'
+ Boolean flag indicating whether, the cache shall be deleted. This
+ includes temporary directory with testing repositories and their
+ configuration files and serialized database file.
+ """
+ #: Define in subclass.
+ KEYS = tuple()
+ #: When running multiple test modules under same process, the test
+ #: repository will be cached until last test case is destroyed. This
+ #: greatly speeds up the testing.
+ TEST_CASES_INSTANTIATED = 0
+
+ def __init__(self, *args, **kwargs):
+ LmiTestCase.__init__(self, *args, **kwargs)
+ self.longMessage = True
+
+ def get_repo(self, repoid):
+ """
+ Get repository object from its name. Prefer testin repositories.
+ Accepts shortened names of testing repositories.
+
+ :param string repoid: Repository id which may be shortened in case of
+ testing one.
+ :returns: Repository object.
+ :rtype: :py:class:`repository.Repository`
+ """
+ try:
+ return self.repodb[repoid]
+ except KeyError:
+ try:
+ return self.repodb[reposetup.full_repo_name(repoid)]
+ except KeyError:
+ return self.other_repos[repoid]
+
+ @classmethod
+ def setUpClass(cls):
+ LmiTestCase.setUpClass.im_func(cls)
+ # This applies to all the commands whose output needs to be parsed.
+ # Make sure we don't need to deal with localisations.
+ os.environ['LANG'] = 'C'
+ SwTestCase.yum_repos_dir = os.environ.get(
+ 'LMI_SOFTWARE_YUM_REPOS_DIR', '/etc/yum.repos.d')
+ SwTestCase._cleanup_db = (
+ os.environ.get('LMI_SOFTWARE_CLEANUP_CACHE', '1')
+ in ('1', 'yes', 'on', 'true'))
+ if not getattr(SwTestCase, 'repodb', False):
+ if os.environ.get('LMI_SOFTWARE_DB_CACHE', None):
+ db_cache = os.environ['LMI_SOFTWARE_DB_CACHE']
+ SwTestCase.repos_dir, SwTestCase.repodb, \
+ SwTestCase.other_repos = \
+ reposetup.load_test_database(db_cache)
+ else:
+ SwTestCase.repos_dir = tempfile.mkdtemp()
+ SwTestCase.repodb, SwTestCase.other_repos = \
+ reposetup.make_object_database(SwTestCase.repos_dir)
+ if not getattr(SwTestCase, '_restore_repos', []):
+ SwTestCase._restore_repos = []
+ for repoid in repository.get_repo_list('enabled'):
+ if repoid in SwTestCase.other_repos:
+ SwTestCase._restore_repos.append(repoid)
+ repository.set_repo_enabled(repoid, False)
+ SwTestCase.other_repos[repoid].refresh()
+ SwTestCase.TEST_CASES_INSTANTIATED += 1
+
+ @classmethod
+ def tearDownClass(cls):
+ if SwTestCase.TEST_CASES_INSTANTIATED <= 1:
+ to_uninstall = set()
+ for repo in SwTestCase.repodb.values():
+ for pkg in repo.packages:
+ to_uninstall.add(pkg.name)
+ if not SwTestCase._cleanup_db:
+ repository.set_repo_enabled(repo.repoid, False)
+ else:
+ os.remove(repo.config_path)
+ to_uninstall = [ p for p in to_uninstall
+ if package.is_pkg_installed(p)]
+ if to_uninstall:
+ subprocess.call([
+ '/usr/bin/rpm', '--quiet', '-e'] + to_uninstall)
+ for repoid in cls._restore_repos:
+ repository.set_repo_enabled(repoid, True)
+ if SwTestCase._cleanup_db:
+ shutil.rmtree(cls.repos_dir)
+ if os.environ.get('LMI_SOFTWARE_DB_CACHE', None):
+ os.remove(os.environ['LMI_SOFTWARE_DB_CACHE'])
+ cls.repodb.clear()
+ SwTestCase.TEST_CASES_INSTANTIATED -= 1
+
diff --git a/src/software/test/test_hosted_software_collection.py b/src/software/test/test_hosted_software_collection.py
index 12db36e..2b05c2d 100644
--- a/src/software/test/test_hosted_software_collection.py
+++ b/src/software/test/test_hosted_software_collection.py
@@ -19,30 +19,26 @@
# Authors: Michal Minar <miminar@redhat.com>
#
"""
-Unit tests for LMI_MemberOfSoftwareCollection provider.
+Unit tests for ``LMI_MemberOfSoftwareCollection`` provider.
"""
import pywbem
import unittest
-import base
+import swbase
-class TestHostedSoftwareCollection(base.SoftwareBaseTestCase):
+class TestHostedSoftwareCollection(swbase.SwTestCase):
"""
- Basic cim operations test.
+ Basic cim operations test on ``LMI_HostedSoftwareCollection``.
"""
CLASS_NAME = "LMI_HostedSoftwareCollection"
KEYS = ("Antecedent", "Dependent")
- @classmethod
- def needs_pkgdb(cls):
- return False
-
def make_op(self):
"""
- @param ses SoftwareElementState property value
- @return object path of SoftwareIdentity
+ :returns Object path of ``LMI_HostedSoftwareCollection``.
+ :rtype: :py:class:`lmi.shell.LMIInstanceName`
"""
return self.cim_class.new_instance_name({
"Antecedent" : self.system_iname,
@@ -54,7 +50,7 @@ class TestHostedSoftwareCollection(base.SoftwareBaseTestCase):
def test_get_instance(self):
"""
- Tests GetInstance call on packages from our rpm cache.
+ Test ``GetInstance()`` call on ``LMI_HostedSoftwareCollection``.
"""
objpath = self.make_op()
inst = objpath.to_instance()
@@ -80,7 +76,7 @@ class TestHostedSoftwareCollection(base.SoftwareBaseTestCase):
def test_enum_instances(self):
"""
- Tests EnumInstances call on installed packages.
+ Test ``EnumInstances()`` call on ``LMI_HostedSoftwareCollection``.
"""
objpath = self.make_op()
insts = self.cim_class.instances()
@@ -93,16 +89,16 @@ class TestHostedSoftwareCollection(base.SoftwareBaseTestCase):
def test_enum_instance_names(self):
"""
- Tests EnumInstanceNames call on installed packages.
+ Test ``EnumInstanceNames`` call on ``LMI_HostedSoftwareCollection``.
"""
objpath = self.make_op()
inames = self.cim_class.instance_names()
self.assertEqual(1, len(inames))
self.assertCIMNameEqual(objpath, inames[0])
- def test_get_antecedent_referents(self):
+ def test_get_computer_system_collections(self):
"""
- Test ReferenceNames for ComputerSystem.
+ Try to get collection associated to computer system.
"""
objpath = self.make_op()
refs = objpath.Antecedent.to_instance().associator_names(
@@ -114,9 +110,9 @@ class TestHostedSoftwareCollection(base.SoftwareBaseTestCase):
ref = refs[0]
self.assertCIMNameEqual(objpath.Dependent, ref)
- def test_get_dependent_referents(self):
+ def test_get_collection_computer_systems(self):
"""
- Test ReferenceNames for SystemSoftwareCollection.
+ Try to get computer system associated to software collection.
"""
objpath = self.make_op()
refs = objpath.Dependent.to_instance().associator_names(
diff --git a/src/software/test/test_hosted_software_identity_resource.py b/src/software/test/test_hosted_software_identity_resource.py
index 09e09f2..77c9342 100644
--- a/src/software/test/test_hosted_software_identity_resource.py
+++ b/src/software/test/test_hosted_software_identity_resource.py
@@ -19,33 +19,28 @@
# Authors: Michal Minar <miminar@redhat.com>
#
"""
-Unit tests for LMI_HostedSoftwareIdentityResource provider.
+Unit tests for ``LMI_HostedSoftwareIdentityResource`` provider.
"""
-import pywbem
import unittest
-import base
+import swbase
-class TestHostedSoftwareIdentityResource(base.SoftwareBaseTestCase):
+ENABLED_STATE_DISABLED = 3
+ENABLED_STATE_ENABLED = 2
+
+class TestHostedSoftwareIdentityResource(swbase.SwTestCase):
"""
- Basic cim operations test.
+ Basic cim operations test on ``LMI_HostedSoftwareIdentityResource.``
"""
CLASS_NAME = "LMI_HostedSoftwareIdentityResource"
KEYS = ("Antecedent", "Dependent")
- @classmethod
- def needs_pkgdb(cls):
- return False
-
- @classmethod
- def needs_repodb(cls):
- return True
-
def make_op(self, repo):
"""
- @return object path of HostedSoftwareIdentityResource association
+ :returns Object path of ``LMI_HostedSoftwareIdentityResource``.
+ :rtype: :py:class:`lmi.shell.LMIInstanceName`
"""
return self.cim_class.new_instance_name({
"Antecedent" : self.system_iname,
@@ -58,19 +53,25 @@ class TestHostedSoftwareIdentityResource(base.SoftwareBaseTestCase):
})
})
+ @swbase.test_with_repos(**{
+ 'stable' : True,
+ 'updates' : True,
+ 'updates-testing' : False,
+ 'misc' : False
+ })
def test_get_instance(self):
"""
- Tests GetInstance call on repositories from our rpm cache.
+ Test ``GetInstance()`` call on ``LMI_HostedSoftwareIdentityResource``.
"""
- for repo in self.repodb:
+ for repo in self.repodb.values():
objpath = self.make_op(repo)
inst = objpath.to_instance()
+ self.assertNotEqual(inst, None,
+ "GetInstance succeeds for repo %s" % repo.repoid)
self.assertCIMNameEqual(objpath, inst.path,
"Object paths should match for repo %s" % repo.repoid)
+ self.assertEqual(set(inst.path.key_properties()), set(self.KEYS))
for key in self.KEYS:
- self.assertIn(key, inst.properties(),
- 'OP is missing \"%s\" key for repo "%s".' %
- (key, repo.repoid))
self.assertCIMNameEqual(
getattr(inst, key), getattr(inst.path, key),
'Key property "%s" does not match for repo "%s"!' %
@@ -78,63 +79,158 @@ class TestHostedSoftwareIdentityResource(base.SoftwareBaseTestCase):
def test_enum_instance_names(self):
"""
- Tests EnumInstanceNames call on repositories from our cache.
+ Test ``EnumInstanceNames()`` call on
+ ``LMI_HostedSoftwareIdentityResource``.
"""
inames = self.cim_class.instance_names()
- repos = { r.repoid: r for r in self.repodb }
- self.assertEqual(len(repos), len(inames))
+ repos = set(r for r in self.repodb)
+ repos.update([r for r in self.other_repos])
+ self.assertEqual(len(inames), len(repos))
for iname in inames:
self.assertIn(iname.Dependent.Name, repos)
- objpath = self.make_op(repos[iname.Dependent.Name])
- self.assertCIMNameEqual(objpath, iname)
+ repoid = iname.Dependent.Name
+ if repoid in self.repodb:
+ repo = self.repodb[repoid]
+ else:
+ repo = self.other_repos[repoid]
+ objpath = self.make_op(repo)
+ self.assertCIMNameEqual(iname, objpath)
+ self.assertEqual(set(iname.key_properties()), set(self.KEYS))
+ repos.remove(iname.Dependent.Name)
+ self.assertEqual(len(repos), 0)
def test_enum_instances(self):
"""
- Tests EnumInstanceNames call on repositories from our cache.
+ Test ``EnumInstanceNames()`` call on
+ ``LMI_HostedSoftwareIdentityResource``.
"""
- inames = self.cim_class.instances()
- repos = { r.repoid: r for r in self.repodb }
- self.assertEqual(len(repos), len(inames))
- for inst in inames:
+ insts = self.cim_class.instances()
+ repos = set(r for r in self.repodb)
+ repos.update([r for r in self.other_repos])
+ self.assertEqual(len(insts), len(repos))
+ for inst in insts:
repoid = inst.Dependent.Name
self.assertIn(repoid, repos)
- objpath = self.make_op(repos[repoid])
- self.assertCIMNameEqual(objpath, inst.path)
+ if repoid in self.repodb:
+ repo = self.repodb[repoid]
+ else:
+ repo = self.other_repos[repoid]
+ objpath = self.make_op(repo)
+ self.assertCIMNameEqual(inst.path, objpath)
+ self.assertEqual(set(inst.path.key_properties()), set(self.KEYS))
for key in self.KEYS:
- self.assertIn(key, inst.properties(),
- 'OP is missing \"%s\" key for repo "%s".' % (key, repoid))
self.assertCIMNameEqual(
getattr(inst, key), getattr(inst.path, key),
'Key property "%s" does not match for repo "%s"!' %
(key, repoid))
+ repos.remove(repoid)
+ self.assertEqual(len(repos), 0)
- def test_get_antecedent_referents(self):
+ @swbase.test_with_repos(**{
+ 'stable' : True,
+ 'updates' : True,
+ 'updates-testing' : False,
+ 'misc' : False
+ })
+ def test_get_computer_system_repositories(self):
"""
- Test ReferenceNames for ComputerSystem.
+ Try to get repositories associated with computer system.
"""
- if not self.repodb:
- return
- repo = self.repodb[0]
- objpath = self.make_op(repo)
- refs = objpath.Antecedent.to_instance().associator_names(
+ objpath = self.make_op(self.get_repo('stable'))
+ refs = objpath.Antecedent.to_instance().associators(
AssocClass=self.CLASS_NAME,
Role="Antecedent",
ResultRole="Dependent",
ResultClass="LMI_SoftwareIdentityResource")
- repos = {r.repoid: r for r in self.repodb}
+ repos = set(r for r in self.repodb)
+ repos.update(self.other_repos.keys())
+ self.assertEqual(len(refs), len(repos))
for ref in refs:
- self.assertEqual(ref.namespace, 'root/cimv2')
+ self.assertEqual(ref.path.namespace, 'root/cimv2')
self.assertEqual(ref.classname, "LMI_SoftwareIdentityResource")
- objpath = self.make_op(repos[ref.Name])
+ repoid = ref.Name
+ if repoid in self.repodb:
+ repo = self.repodb[repoid]
+ else:
+ repo = self.other_repos[repoid]
+ objpath = self.make_op(repo)
+ self.assertCIMNameEqual(ref.path, objpath.Dependent)
+ self.assertEqual(set(ref.path.key_properties()),
+ set(["CreationClassName", "Name",
+ "SystemCreationClassName", "SystemName"]))
+ if repo.status:
+ self.assertEqual(ref.EnabledState, ENABLED_STATE_ENABLED,
+ "EnabledState does not match for repo %s" % repo.repoid)
+ else:
+ self.assertEqual(ref.EnabledState, ENABLED_STATE_DISABLED,
+ "EnabledState does not match for repo %s" % repo.repoid)
+ repos.remove(ref.Name)
+ self.assertEqual(len(repos), 0)
+
+ @swbase.test_with_repos(**{
+ 'stable' : True,
+ 'updates' : True,
+ 'updates-testing' : False,
+ 'misc' : False
+ })
+ def test_get_computer_system_repository_names(self):
+ """
+ Try to get repository names associated with computer system.
+ """
+ objpath = self.make_op(self.get_repo('stable'))
+ refs = objpath.Antecedent.to_instance().associator_names(
+ AssocClass=self.CLASS_NAME,
+ Role="Antecedent",
+ ResultRole="Dependent",
+ ResultClass="LMI_SoftwareIdentityResource")
+ repos = set(r for r in self.repodb)
+ repos.update(self.other_repos.keys())
+ self.assertEqual(len(refs), len(repos))
+ for ref in refs:
+ repoid = ref.Name
+ if repoid in self.repodb:
+ repo = self.repodb[repoid]
+ else:
+ repo = self.other_repos[repoid]
+ objpath = self.make_op(repo)
self.assertCIMNameEqual(objpath.Dependent, ref)
- del repos[ref.Name]
- self.assertEqual(0, len(repos))
+ self.assertEqual(set(ref.key_properties()),
+ set(["CreationClassName", "Name",
+ "SystemCreationClassName", "SystemName"]))
+ repos.remove(ref.Name)
+ self.assertEqual(len(repos), 0)
+
+ @swbase.test_with_repos(**{
+ 'stable' : True,
+ 'updates' : True,
+ 'updates-testing' : False,
+ 'misc' : False
+ })
+ def test_get_repository_computer_systems(self):
+ """
+ Try to get computer system associated with repository.
+ """
+ for repo in self.repodb.values():
+ objpath = self.make_op(repo=repo)
+ refs = objpath.Dependent.to_instance().associators(
+ AssocClass=self.CLASS_NAME,
+ Role="Dependent",
+ ResultRole="Antecedent",
+ ResultClass=self.system_cs_name)
+ self.assertEqual(len(refs), 1)
+ self.assertCIMNameEqual(refs[0].path, objpath.Antecedent)
- def test_get_dependent_referents(self):
+ @swbase.test_with_repos(**{
+ 'stable' : True,
+ 'updates' : True,
+ 'updates-testing' : False,
+ 'misc' : False
+ })
+ def test_get_repository_computer_system_names(self):
"""
- Test ReferenceNames for repository.
+ Try to get computer system names associated with repository.
"""
- for repo in self.repodb:
+ for repo in self.repodb.values():
objpath = self.make_op(repo=repo)
refs = objpath.Dependent.to_instance().associator_names(
AssocClass=self.CLASS_NAME,
@@ -142,7 +238,7 @@ class TestHostedSoftwareIdentityResource(base.SoftwareBaseTestCase):
ResultRole="Antecedent",
ResultClass=self.system_cs_name)
self.assertEqual(len(refs), 1)
- self.assertCIMNameEqual(objpath.Antecedent, refs[0])
+ self.assertCIMNameEqual(refs[0], objpath.Antecedent)
def suite():
"""For unittest loaders."""
diff --git a/src/software/test/test_installed_software_identity.py b/src/software/test/test_installed_software_identity.py
index ee79318..3535294 100644
--- a/src/software/test/test_installed_software_identity.py
+++ b/src/software/test/test_installed_software_identity.py
@@ -19,93 +19,144 @@
# Authors: Michal Minar <miminar@redhat.com>
#
"""
-Unit tests for LMI_InstalledSoftwareIdentity provider.
+Unit tests for ``LMI_InstalledSoftwareIdentity`` provider.
"""
import pywbem
import unittest
+from lmi.shell import LMIDeletedObjectError
from lmi.test.lmibase import enable_lmi_exceptions
-from lmi.test.util import mark_dangerous
from lmi.test.util import mark_tedious
-import base
-import rpmcache
+import package
+import swbase
-class TestInstalledSoftwareIdentity(base.SoftwareBaseTestCase):
+class TestInstalledSoftwareIdentity(swbase.SwTestCase):
"""
- Basic cim operations test.
+ Basic cim operations test on `LMI_InstalledSoftwareIdentity``.
"""
CLASS_NAME = "LMI_InstalledSoftwareIdentity"
KEYS = ("InstalledSoftware", "System")
- def make_op(self, pkg, newer=True):
+ def make_op(self, pkg):
"""
- @return object path of InstalledSoftwareIdentity association
+ :returns Object path of ``LMI_InstalledSoftwareIdentity``.
+ :rtype: :py:class:`lmi.shell.LMIInstanceName`
"""
return self.cim_class.new_instance_name({
"System" : self.system_iname,
"InstalledSoftware" : self.ns.LMI_SoftwareIdentity. \
new_instance_name({
- "InstanceID" : 'LMI:LMI_SoftwareIdentity:'
- + pkg.get_nevra(newer=newer, with_epoch="ALWAYS")
+ "InstanceID" : 'LMI:LMI_SoftwareIdentity:' + pkg.nevra
})
})
+ @swbase.test_with_packages('stable#pkg1')
def test_get_instance(self):
"""
- Tests GetInstance call on packages from our rpm cache.
+ Test ``GetInstance()`` call on ``LMI_InstalledSoftwareIdentity``.
"""
- for pkg in self.safe_pkgs:
- objpath = self.make_op(pkg)
- inst = objpath.to_instance()
- self.assertCIMNameEqual(inst.path, objpath,
- "Object paths should match for package %s"%pkg)
- for key in self.KEYS:
- self.assertIn(key, inst.properties(),
- "OP is missing \"%s\" key for package %s"%(key, pkg))
- self.assertCIMNameEqual(
- getattr(inst, key), getattr(inst.path, key))
- self.assertCIMNameEqual(objpath, inst.path,
- "Object paths should match for package %s"%pkg)
-
- # try it now with CIM_ComputerSystem
- system = objpath.System.copy()
- objpath.System.wrapped_object.classname = "CIM_ComputerSystem"
- objpath.System.wrapped_object.CreationClassName = \
- "CIM_ComputerSystem"
-
- inst = objpath.to_instance()
- self.assertCIMNameEqual(objpath, inst.path,
- "Object paths should match for package %s"%pkg)
- for key in self.KEYS:
- self.assertIn(key, inst.properties(),
- "OP is missing \"%s\" key for package %s"%(key, pkg))
- self.assertCIMNameEqual(system, inst.System)
- self.assertCIMNameEqual(objpath, inst.path)
+ repo = self.get_repo('stable')
+ pkg = repo['pkg1']
+ objpath = self.make_op(pkg)
+ inst = objpath.to_instance()
+ self.assertEqual(set(inst.path.key_properties()), set(self.KEYS))
+ for key in self.KEYS:
+ self.assertCIMNameEqual(
+ getattr(inst, key), getattr(inst.path, key))
+ self.assertCIMNameEqual(inst.path, objpath,
+ "Object paths should match for package %s" % pkg)
+
+ # try it now with CIM_ComputerSystem
+ system = objpath.System.copy()
+ objpath.System.wrapped_object.classname = "CIM_ComputerSystem"
+ objpath.System.wrapped_object.CreationClassName = \
+ "CIM_ComputerSystem"
+
+ inst = objpath.to_instance()
+ self.assertCIMNameEqual(inst.path, objpath,
+ "Object paths should match for package %s" % pkg)
+ for key in self.KEYS:
+ self.assertIn(key, inst.properties(),
+ "OP is missing \"%s\" key for package %s" % (key, pkg))
+ self.assertCIMNameEqual(inst.System, system)
+ self.assertCIMNameEqual(inst.path, objpath)
+
+ @enable_lmi_exceptions
+ @swbase.test_with_packages(**{'stable#pkg2' : False })
+ def test_get_instance_not_installed(self):
+ """
+ Test ``GetInstance()`` call on ``LMI_InstalledSoftwareIdentity``.
+ """
+ repo = self.get_repo('stable')
+ pkg = repo['pkg2']
+ objpath = self.make_op(pkg)
+ self.assertRaisesCIM(pywbem.CIM_ERR_NOT_FOUND, objpath.to_instance)
- def test_enum_instance_names_safe(self):
+ @mark_tedious
+ @swbase.test_with_packages(**{
+ 'pkg3' : False,
+ 'pkg4' : False,
+ 'stable#pkg1' : True,
+ 'stable#pkg2' : False,
+ 'updates#*' : False,
+ 'updates-testing#pkg1' : False,
+ 'updates-testing#pkg2' : True,
+ 'misc#conflict1' : False,
+ 'misc#conflict2' : False,
+ 'misc#depend1' : False,
+ 'misc#depend2' : False,
+ 'misc#pkg1' : False,
+ 'misc#funny-version' : True,
+ 'misc#funny-release' : True,
+ })
+ def test_enum_instance_names(self):
"""
- Tests EnumInstanceNames call on installed packages.
+ Test ``EnumInstanceNames()`` call on ``LMI_InstalledSoftwareIdentity``.
"""
inames = self.cim_class.instance_names()
self.assertGreater(len(inames), 0)
- objpath = self.make_op(self.safe_pkgs[0])
+ objpath = self.make_op(self.get_repo('stable')['pkg1'])
for iname in inames:
self.assertEqual(iname.namespace, 'root/cimv2')
self.assertEqual(iname.classname, self.CLASS_NAME)
self.assertEqual(sorted(iname.key_properties()), sorted(self.KEYS))
self.assertCIMNameEqual(objpath.System, iname.System)
- nevra_set = set(i.InstalledSoftware.InstanceID for i in inames)
- for pkg in self.safe_pkgs:
- nevra = 'LMI:LMI_SoftwareIdentity:' \
- + pkg.get_nevra(with_epoch="ALWAYS")
- self.assertTrue(nevra in nevra_set, 'Missing nevra "%s".' % nevra)
+ nevra_set = set(i.InstalledSoftware.InstanceID[
+ len('LMI:LMI_SoftwareIdentity:'):] for i in inames)
+ db_installed = 0
+ for repo in self.repodb.values():
+ for pkg in repo.packages:
+ if package.is_pkg_installed(pkg):
+ self.assertTrue(pkg.nevra in nevra_set)
+ db_installed += 1
+ else:
+ self.assertFalse(pkg.nevra in nevra_set)
+ # test that number of tested packages installed really match number
+ # of requested packages
+ self.assertEqual(db_installed, 4)
@mark_tedious
+ @swbase.test_with_packages(**{
+ 'pkg3' : False,
+ 'pkg4' : False,
+ 'stable#pkg1' : True,
+ 'stable#pkg2' : False,
+ 'updates#*' : False,
+ 'updates-testing#pkg1' : False,
+ 'updates-testing#pkg2' : True,
+ 'misc#conflict1' : False,
+ 'misc#conflict2' : False,
+ 'misc#depend1' : False,
+ 'misc#depend2' : False,
+ 'misc#pkg1' : False,
+ 'misc#funny-version' : True,
+ 'misc#funny-release' : True,
+ })
def test_enum_instances(self):
"""
- Tests EnumInstances call on installed packages.
+ Test ``EnumInstances() call on ``LMI_InstalledSoftwareIdentity``.
"""
insts = self.cim_class.instances()
self.assertGreater(len(insts), 0)
@@ -114,94 +165,96 @@ class TestInstalledSoftwareIdentity(base.SoftwareBaseTestCase):
for key in self.KEYS:
self.assertCIMNameEqual(
getattr(inst, key), getattr(inst.path, key))
- nevra_set = set(i.InstalledSoftware.InstanceID for i in insts)
- for pkg in self.safe_pkgs:
- nevra = 'LMI:LMI_SoftwareIdentity:' \
- + pkg.get_nevra(with_epoch="ALWAYS")
- self.assertTrue(nevra in nevra_set, "Missing pkg %s in nevra_set."
- % nevra)
-
- @unittest.skip("dangerous tests are not reliable")
- @mark_tedious
- @mark_dangerous
- def test_enum_instance_names(self):
+ nevra_set = set(i.InstalledSoftware.InstanceID[
+ len('LMI:LMI_SoftwareIdentity:'):] for i in insts)
+ db_installed = 0
+ for repo in self.repodb.values():
+ for pkg in repo.packages:
+ if package.is_pkg_installed(pkg):
+ self.assertTrue(pkg.nevra in nevra_set)
+ db_installed += 1
+ else:
+ self.assertFalse(pkg.nevra in nevra_set)
+ # test that number of tested packages installed really match number
+ # of requested packages
+ self.assertEqual(db_installed, 4)
+
+ @swbase.test_with_repos('stable',
+ **{'updates' : False, 'updates-testing' : False, 'misc' : False })
+ @swbase.test_with_packages(**{'stable#pkg1' : False})
+ def test_install_package(self):
"""
- Tests EnumInstanceNames call on dangerous packages.
+ Test package installation with ``LMI_InstalledSoftwareIdentity``.
"""
- pkg = self.dangerous_pkgs[0]
+ repo = self.get_repo('stable')
+ pkg = repo['pkg1']
+ self.assertFalse(package.is_pkg_installed(pkg))
+ objpath = self.make_op(pkg)
- rpmcache.ensure_pkg_installed(pkg)
+ properties = {
+ "InstalledSoftware" : objpath.InstalledSoftware,
+ "System" : objpath.System
+ }
- inames1 = self.cim_class.instance_names()
- self.assertGreater(len(inames1), 1)
- self.assertIn('LMI:LMI_SoftwareIdentity:'
- + pkg.get_nevra(with_epoch="ALWAYS"),
- set(i.InstalledSoftware.InstanceID for i in inames1))
-
- rpmcache.remove_pkg(pkg.name)
- inames2 = self.cim_class.instance_names()
- self.assertEqual(len(inames1), len(inames2) + 1)
- self.assertNotIn('LMI:LMI_SoftwareIdentity:'
- + pkg.get_nevra(with_epoch="ALWAYS"),
- set(i.InstalledSoftware.InstanceID for i in inames2))
-
- rpmcache.install_pkg(pkg)
- inames3 = self.cim_class.instance_names()
- self.assertEqual(len(inames1), len(inames3))
- self.assertIn('LMI:LMI_SoftwareIdentity:'
- + pkg.get_nevra(with_epoch="ALWAYS"),
- set(i.InstalledSoftware.InstanceID for i in inames3))
+ inst = self.cim_class.create_instance(properties)
+ self.assertTrue(package.is_pkg_installed(pkg))
+ self.assertCIMNameEqual(inst.path, objpath,
+ 'Object path does not match for %s.' % pkg)
@enable_lmi_exceptions
- @mark_dangerous
- def test_create_instance(self):
+ @swbase.test_with_repos('stable',
+ **{'updates' : False, 'updates-testing' : False, 'misc' : False })
+ @swbase.test_with_packages('stable#pkg1')
+ def test_install_already_installed(self):
"""
- Tests new instance creation for dangerous packages.
+ Try to install package that is already installed with
+ ``LMI_InstalledSoftwareIdentity``.
"""
- for pkg in self.dangerous_pkgs:
- if rpmcache.is_pkg_installed(pkg.name):
- rpmcache.remove_pkg(pkg.name)
- objpath = self.make_op(pkg)
- self.assertFalse(rpmcache.is_pkg_installed(pkg))
-
- properties = {
- "InstalledSoftware" : objpath.InstalledSoftware,
- "System" : objpath.System
- }
+ pkg = self.get_repo('stable')['pkg1']
+ self.assertTrue(package.is_pkg_installed(pkg))
+ objpath = self.make_op(pkg)
- inst = self.cim_class.create_instance(properties)
- self.assertTrue(rpmcache.is_pkg_installed(pkg))
- self.assertCIMNameEqual(objpath, inst.path,
- 'Object path does not match for %s.' % pkg)
-
- # try to install second time
- self.assertRaisesCIM(pywbem.CIM_ERR_ALREADY_EXISTS,
- self.cim_class.create_instance, inst.properties_dict())
+ properties = {
+ "InstalledSoftware" : objpath.InstalledSoftware,
+ "System" : objpath.System
+ }
+ # try to install second time
+ self.assertRaisesCIM(pywbem.CIM_ERR_ALREADY_EXISTS,
+ self.cim_class.create_instance, properties)
@enable_lmi_exceptions
- @mark_dangerous
- def test_delete_instance(self):
+ @swbase.test_with_packages('stable#pkg2')
+ def test_remove_installed(self):
"""
- Tests removing of dangerous packages by deleting instance name.
+ Test package removal with ``LMI_InstalledSoftwareIdentity``.
"""
- for pkg in self.dangerous_pkgs:
- self.ensure_pkg_installed(pkg)
- self.assertTrue(rpmcache.is_pkg_installed(pkg))
- objpath = self.make_op(pkg)
- inst = objpath.to_instance()
- inst.delete()
- self.assertFalse(rpmcache.is_pkg_installed(pkg),
- "Failed to delete instance for %s." % pkg)
+ pkg = self.get_repo('stable')['pkg2']
+ self.assertTrue(package.is_pkg_installed(pkg))
+ objpath = self.make_op(pkg)
+ inst = objpath.to_instance()
+ inst.delete()
+ self.assertFalse(package.is_pkg_installed(pkg))
- # try to delete second time
- self.assertRaisesCIM(pywbem.CIM_ERR_NOT_FOUND, inst.delete)
+ # try to delete second time
+ self.assertRaises(LMIDeletedObjectError, inst.delete)
+ self.assertRaisesCIM(pywbem.CIM_ERR_NOT_FOUND, objpath.to_instance)
@mark_tedious
- def test_get_system_referents(self):
+ @swbase.test_with_packages(**{
+ 'updates#pkg3' : True,
+ 'stable#pkg1' : True,
+ 'pkg2' : False,
+ 'pkg4' : False,
+ 'updates-testing#*' : False,
+ 'misc#*' : False
+ })
+ def test_get_installed_package_names(self):
"""
- Test ReferenceNames for ComputerSystem.
+ Try to get installed packages associated with computer system
+ via ``LMI_InstalledSoftwareIdentity``.
"""
- objpath = self.make_op(self.safe_pkgs[0])
+ pkg = self.get_repo('stable')['pkg1']
+ objpath = self.make_op(pkg)
refs = objpath.System.to_instance().associator_names(
AssocClass=self.CLASS_NAME,
Role="System",
@@ -214,27 +267,38 @@ class TestInstalledSoftwareIdentity(base.SoftwareBaseTestCase):
self.assertEqual(ref.key_properties(), ["InstanceID"])
self.assertTrue(ref.InstanceID.startswith(
"LMI:LMI_SoftwareIdentity:"))
+ nevra_set = set( i.InstanceID[len('LMI:LMI_SoftwareIdentity:'):]
+ for i in refs)
- nevra_set = set(i.InstanceID for i in refs)
- for pkg in self.safe_pkgs:
- nevra = 'LMI:LMI_SoftwareIdentity:' \
- + pkg.get_nevra(with_epoch="ALWAYS")
- self.assertTrue(nevra in nevra_set, 'Missing nevra "%s".' % nevra)
+ db_installed = 0
+ for repo in self.repodb.values():
+ for pkg in repo.packages:
+ if package.is_pkg_installed(pkg):
+ self.assertTrue(pkg.nevra in nevra_set)
+ db_installed += 1
+ else:
+ self.assertFalse(pkg.nevra in nevra_set)
+ # test that number of tested packages installed really match number
+ # of requested packages
+ self.assertEqual(db_installed, 2)
+ @swbase.test_with_packages('stable#pkg1')
def test_get_installed_software_referents(self):
"""
- Test ReferenceNames for SoftwareIdentity.
- """
- for pkg in self.safe_pkgs:
- objpath = self.make_op(pkg)
- refs = objpath.InstalledSoftware.to_instance().associator_names(
- AssocClass='LMI_InstalledSoftwareIdentity',
- Role="InstalledSoftware",
- ResultRole="System",
- ResultClass=self.system_cs_name)
- self.assertEqual(len(refs), 1)
- ref = refs[0]
- self.assertCIMNameEqual(objpath.System, ref)
+ Try to get computer system associated with installed package via
+ ``LMI_InstalledSoftwareIdentity``.
+ """
+ pkg = self.get_repo('stable')['pkg1']
+ self.assertTrue(package.is_pkg_installed(pkg))
+ objpath = self.make_op(pkg)
+ refs = objpath.InstalledSoftware.to_instance().associator_names(
+ AssocClass='LMI_InstalledSoftwareIdentity',
+ Role="InstalledSoftware",
+ ResultRole="System",
+ ResultClass=self.system_cs_name)
+ self.assertEqual(len(refs), 1)
+ ref = refs[0]
+ self.assertCIMNameEqual(objpath.System, ref)
def suite():
"""For unittest loaders."""
diff --git a/src/software/test/test_member_of_software_collection.py b/src/software/test/test_member_of_software_collection.py
index e51f56d..6acb9e0 100644
--- a/src/software/test/test_member_of_software_collection.py
+++ b/src/software/test/test_member_of_software_collection.py
@@ -19,25 +19,25 @@
# Authors: Michal Minar <miminar@redhat.com>
#
"""
-Unit tests for LMI_MemberOfSoftwareCollection provider.
+Unit tests for ``LMI_MemberOfSoftwareCollection`` provider.
"""
-import pywbem
import unittest
-import base
+import swbase
-class TestMemberOfSoftwareCollection(base.SoftwareBaseTestCase):
+class TestMemberOfSoftwareCollection(swbase.SwTestCase):
"""
- Basic cim operations test.
+ Basic cim operations test for ``LMI_MemberOfSoftwareCollection``.
"""
CLASS_NAME = "LMI_MemberOfSoftwareCollection"
KEYS = ("Collection", "Member")
- def make_op(self, pkg, newer=True):
+ def make_op(self, pkg):
"""
- @return object path of MembeOfSoftwareCollection association
+ :returns Object path of ``LMI_MemberOfSoftwareCollection``.
+ :rtype: :py:class:`lmi.shell.LMIInstanceName`
"""
return self.cim_class.new_instance_name({
"Collection" : self.ns.LMI_SystemSoftwareCollection. \
@@ -45,46 +45,228 @@ class TestMemberOfSoftwareCollection(base.SoftwareBaseTestCase):
"InstanceID" : "LMI:LMI_SystemSoftwareCollection"
}),
"Member" : self.ns.LMI_SoftwareIdentity.new_instance_name({
- "InstanceID" : 'LMI:LMI_SoftwareIdentity:' \
- + pkg.get_nevra(newer=newer, with_epoch="ALWAYS")
+ "InstanceID" : 'LMI:LMI_SoftwareIdentity:' + pkg.nevra
})
})
+ @swbase.test_with_repos('stable', 'misc', 'updates-testing')
def test_get_instance(self):
"""
- Tests GetInstance call on packages from our rpm cache.
+ Test ``GetInstance()`` call on ``LMI_MemberOfSoftwareCollection``.
"""
- for pkg in self.safe_pkgs:
+ for repoid in ('stable', 'misc', 'updates-testing'):
+ repo = self.get_repo(repoid)
+ for pkg in repo.packages:
+ objpath = self.make_op(pkg)
+ inst = objpath.to_instance()
+ self.assertNotEqual(inst, None,
+ 'Package "%s" is available in SystemSoftwareCollection'
+ % pkg)
+ self.assertCIMNameEqual(inst.path, objpath,
+ "Object paths should match for package %s" % pkg)
+ self.assertEqual(set(inst.path.key_properties()),
+ set(self.KEYS))
+
+ @swbase.test_with_repos(misc=False)
+ def test_get_instance_with_disabled_repo(self):
+ """
+ Test ``GetInstance()`` call on ``LMI_MemberOfSoftwareCollection``
+ with package from disabled repository.
+ """
+ repo = self.get_repo('misc')
+ for pkg in repo.packages:
objpath = self.make_op(pkg)
inst = objpath.to_instance()
- self.assertNotEqual(inst, None,
- 'Package "%s" not available in SystemSoftwareCollection'
- % str(pkg))
- self.assertCIMNameEqual(inst.path, objpath,
- "Object paths should match for package %s"%pkg)
+ self.assertEqual(inst, None,
+ 'package should not be accessible from disabled repository.')
+
+ @swbase.test_with_repos(**{
+ 'stable' : True,
+ 'updates': False,
+ 'updates-testing': True,
+ 'misc' : False
+ })
+ @swbase.test_with_packages(**{
+ 'stable#pkg1' : True,
+ 'updates#pkg2' : True,
+ 'pkg3' : False,
+ 'pkg4' : False
+ })
+ def test_enum_instances(self):
+ """
+ Test ``EnumInstances()`` call on ``LMI_MemberOfSoftwareCollection``.
+ """
+ insts = self.cim_class.instances()
+ self.assertGreater(len(insts), 0)
+ for inst in insts:
+ self.assertEqual(set(inst.path.key_properties()), set(self.KEYS))
for key in self.KEYS:
- self.assertIn(key, inst.properties(),
- "OP is missing \"%s\" key for package %s"%(key, pkg))
+ self.assertCIMNameEqual(
+ getattr(inst, key), getattr(inst.path, key))
+ nevra_set = set(i.Member.InstanceID[
+ len('LMI:LMI_SoftwareIdentity:'):] for i in insts)
+ for repo in self.repodb.values():
+ if not repo.repoid in (
+ 'openlmi-sw-test-repo-stable',
+ 'openlmi-sw-test-repo-updates-testing'):
+ continue
+ for pkg in repo.packages:
+ self.assertIn(pkg.nevra, nevra_set)
+ nevra_set.remove(pkg.nevra)
+ self.assertEqual(len(nevra_set), 0)
+
+ @swbase.test_with_repos(**{
+ 'stable' : True,
+ 'updates': False,
+ 'updates-testing': True,
+ 'misc' : False
+ })
+ def test_enum_instance_names(self):
+ """
+ Test ``EnumInstanceNames()`` call on ``LMI_MemberOfSoftwareCollection``.
+ """
+ inames = self.cim_class.instance_names()
+ self.assertGreater(len(inames), 0)
+ objpath = self.make_op(self.get_repo('stable')['pkg1'])
+ for iname in inames:
+ self.assertEqual(iname.namespace, 'root/cimv2')
+ self.assertEqual(iname.classname, self.CLASS_NAME)
+ self.assertEqual(sorted(iname.key_properties()), sorted(self.KEYS))
+ self.assertCIMNameEqual(objpath.Collection, iname.Collection)
+ nevra_set = set(i.Member.InstanceID[
+ len('LMI:LMI_SoftwareIdentity:'):] for i in inames)
+ for repo in self.repodb.values():
+ if not repo.repoid in (
+ 'openlmi-sw-test-repo-stable',
+ 'openlmi-sw-test-repo-updates-testing'):
+ continue
+ for pkg in repo.packages:
+ self.assertIn(pkg.nevra, nevra_set)
+ nevra_set.remove(pkg.nevra)
+ self.assertEqual(len(nevra_set), 0)
+
+
+ @swbase.test_with_repos('stable')
+ def test_get_package_collections(self):
+ """
+ Try to get instance of software collection associated with package.
+ """
+ repo = self.get_repo('stable')
+ for pkg in repo.packages:
+ objpath = self.make_op(pkg)
+ inst = objpath.Member.to_instance()
+ self.assertNotEqual(inst, None,
+ "package %s is available" % pkg)
+ refs = inst.associators(
+ AssocClass=self.CLASS_NAME,
+ Role="Member",
+ ResultRole="Collection",
+ ResultClass="LMI_SystemSoftwareCollection")
+ self.assertEqual(len(refs), 1,
+ "exactly one collection is associated with package %s" % pkg)
+ self.assertCIMNameEqual(refs[0].path, objpath.Collection)
+ self.assertEqual(refs[0].InstanceID, objpath.Collection.InstanceID)
- def test_get_member_referents(self):
+ @swbase.test_with_repos('stable')
+ def test_get_package_collection_names(self):
"""
- Test ReferenceNames for SoftwareIdentity.
+ Try to get instance of software collection associated with package.
"""
- for pkg in self.safe_pkgs:
+ repo = self.get_repo('stable')
+ for pkg in repo.packages:
objpath = self.make_op(pkg)
- member = objpath.Member.to_instance()
- self.assertNotEqual(member, None,
- 'Package "%s" not found' % str(pkg))
- refs = member.associator_names(
+ inst = objpath.Member.to_instance()
+ self.assertNotEqual(inst, None,
+ "package %s is available" % pkg)
+ refs = inst.associator_names(
AssocClass=self.CLASS_NAME,
Role="Member",
ResultRole="Collection",
ResultClass="LMI_SystemSoftwareCollection")
self.assertEqual(len(refs), 1,
- 'Package "%s" not available in SystemSoftwareCollection'
- % str(pkg))
- ref = refs[0]
- self.assertCIMNameEqual(objpath.Collection, ref)
+ "exactly one collection is associated with package %s" % pkg)
+ self.assertCIMNameEqual(refs[0], objpath.Collection)
+
+ @swbase.test_with_repos(**{
+ 'stable' : True,
+ 'updates' : False,
+ 'updates-testing' : True,
+ 'misc' : False
+ })
+ def test_get_collection_package_names(self):
+ """
+ Try to get package names associated to software collection.
+ """
+ pkg = self.get_repo('stable')['pkg1']
+ objpath = self.make_op(pkg)
+ collection = objpath.Collection.to_instance()
+ self.assertNotEqual(collection, None,
+ "GetInstance call succeeds on collection")
+ refs = collection.associator_names(
+ AssocClass=self.CLASS_NAME,
+ Role="Collection",
+ ResultRole="Member",
+ ResultClass="LMI_SoftwareIdentity")
+ self.assertGreater(len(refs), 0)
+
+ for ref in refs:
+ self.assertEqual(ref.namespace, 'root/cimv2')
+ self.assertEqual(ref.classname, "LMI_SoftwareIdentity")
+ self.assertEqual(ref.key_properties(), ["InstanceID"])
+ self.assertTrue(ref.InstanceID.startswith(
+ "LMI:LMI_SoftwareIdentity:"))
+
+ nevra_set = set( i.InstanceID[len('LMI:LMI_SoftwareIdentity:'):]
+ for i in refs)
+ for repo in self.repodb.values():
+ if not repo.repoid in (
+ 'openlmi-sw-test-repo-stable',
+ 'openlmi-sw-test-repo-updates-testing'):
+ continue
+ for pkg in repo.packages:
+ self.assertIn(pkg.nevra, nevra_set)
+ nevra_set.remove(pkg.nevra)
+ self.assertEqual(len(nevra_set), 0)
+
+ @swbase.test_with_repos(**{
+ 'stable' : True,
+ 'updates' : False,
+ 'updates-testing' : True,
+ 'misc' : False
+ })
+ def test_get_collection_packages(self):
+ """
+ Try to get packages associated to software collection.
+ """
+ pkg = self.get_repo('stable')['pkg1']
+ objpath = self.make_op(pkg)
+ collection = objpath.Collection.to_instance()
+ self.assertNotEqual(collection, None,
+ "GetInstance call succeeds on collection")
+ refs = collection.associators(
+ AssocClass=self.CLASS_NAME,
+ Role="Collection",
+ ResultRole="Member",
+ ResultClass="LMI_SoftwareIdentity")
+ self.assertGreater(len(refs), 0)
+
+ for ref in refs:
+ self.assertEqual(ref.namespace, 'root/cimv2')
+ self.assertEqual(ref.classname, "LMI_SoftwareIdentity")
+ self.assertEqual(ref.path.key_properties(), ["InstanceID"])
+ self.assertTrue(ref.InstanceID.startswith(
+ "LMI:LMI_SoftwareIdentity:"))
+
+ nevra_set = set(i.ElementName for i in refs)
+ for repo in self.repodb.values():
+ if not repo.repoid in (
+ 'openlmi-sw-test-repo-stable',
+ 'openlmi-sw-test-repo-updates-testing'):
+ continue
+ for pkg in repo.packages:
+ self.assertIn(pkg.nevra, nevra_set)
+ nevra_set.remove(pkg.nevra)
+ self.assertEqual(len(nevra_set), 0)
def suite():
"""For unittest loaders."""
diff --git a/src/software/test/test_resource_for_software_identity.py b/src/software/test/test_resource_for_software_identity.py
index faeecb8..e44f0db 100644
--- a/src/software/test/test_resource_for_software_identity.py
+++ b/src/software/test/test_resource_for_software_identity.py
@@ -19,17 +19,16 @@
# Authors: Michal Minar <miminar@redhat.com>
#
"""
-Unit tests for LMI_ResourceForSoftwareIdentity provider.
+Unit tests for ``LMI_ResourceForSoftwareIdentity`` provider.
"""
-import pywbem
import unittest
-from lmi.test.util import mark_dangerous
-from lmi.test.util import mark_tedious
-import base
+import swbase
-class TestResourceForSoftwareIdentity(base.SoftwareBaseTestCase):
+ENABLED_STATE_ENABLED = 2
+
+class TestResourceForSoftwareIdentity(swbase.SwTestCase):
"""
Basic cim operations test.
"""
@@ -37,13 +36,10 @@ class TestResourceForSoftwareIdentity(base.SoftwareBaseTestCase):
CLASS_NAME = "LMI_ResourceForSoftwareIdentity"
KEYS = ("ManagedElement", "AvailableSAP")
- @classmethod
- def needs_repodb(cls):
- return True
-
- def make_op(self, pkg=None, newer=True, repo=None):
+ def make_op(self, pkg=None, repo=None):
"""
- @return object path of ResourceForSoftwareIdentity association
+ :returns: Object path of ``LMI_ResourceForSoftwareIdentity``
+ :rtype: :py:class:`lmi.shell.LMIInstanceName`
"""
objpath = self.cim_class.new_instance_name({
"AvailableSAP" : self.ns.LMI_SoftwareIdentityResource. \
@@ -52,125 +48,217 @@ class TestResourceForSoftwareIdentity(base.SoftwareBaseTestCase):
"SystemCreationClassName" : self.system_cs_name,
"SystemName" : self.SYSTEM_NAME
}),
- "ManagedElement" : self.ns.LMI_SoftwareIdentity.new_instance_name({})
+ "ManagedElement" :
+ self.ns.LMI_SoftwareIdentity.new_instance_name({})
})
if repo is not None:
objpath.AvailableSAP.wrapped_object["Name"] = repo.repoid
elif pkg is not None:
- objpath.AvailableSAP.wrapped_object["Name"] = getattr(pkg,
- 'up_repo' if newer else 'repo')
+ objpath.AvailableSAP.wrapped_object["Name"] = pkg.repoid
if pkg is not None:
- objpath.ManagedElement.wrappe_object["InstanceID"] = (
- 'LMI:LMI_SoftwareIdentity:'
- + pkg.get_nevra(newer=newer, with_epoch="ALWAYS"))
+ objpath.ManagedElement.wrapped_object["InstanceID"] = (
+ 'LMI:LMI_SoftwareIdentity:' + pkg.nevra)
return objpath
- @mark_dangerous
+ @swbase.test_with_repos('stable', 'updates', 'misc',
+ **{'updates-testing' : True})
def test_get_instance(self):
"""
- Tests GetInstance call on packages from our rpm cache.
+ Test ``GetInstance()`` call on ``LMI_ResourceForSoftwareIdentity``.
+ """
+ for repo in self.repodb.values():
+ self.assertTrue(repo.status)
+ for pkg in repo.packages:
+ objpath = self.make_op(pkg, repo)
+ inst = objpath.to_instance()
+ self.assertNotEqual(inst, None)
+ self.assertEqual(set(inst.path.key_properties()),
+ set(self.KEYS))
+ for key in self.KEYS:
+ self.assertCIMNameEqual(
+ getattr(inst, key), getattr(inst.path, key),
+ 'Key property "%s" does not match for package "%s#%s"!'
+ % (key, repo.repoid, pkg))
+
+ @swbase.test_with_repos(**{'updates' : False})
+ def test_get_instance_disabled_repo(self):
+ """
+ Test ``GetInstance()`` call on ``LMI_ResourceForSoftwareIdentity``
+ with disabled repository.
"""
- for pkg in self.dangerous_pkgs:
- objpath = self.make_op(pkg)
+ repo = self.get_repo('updates')
+ self.assertFalse(repo.status)
+ for pkg in repo.packages:
+ objpath = self.make_op(pkg, repo)
inst = objpath.to_instance()
- self.assertCIMNameEqual(objpath, inst.path,
- "Object paths should match for package %s"%pkg)
+ self.assertNotEqual(inst, None)
+ self.assertEqual(set(inst.path.key_properties()),
+ set(self.KEYS))
for key in self.KEYS:
- self.assertIn(key, inst.properties(),
- 'OP is missing \"%s\" key for package "%s".' % (key, pkg))
self.assertCIMNameEqual(
- getattr(inst, key), getattr(inst.path, key),
- 'Key property "%s" does not match for package "%s"!' %
- (key, pkg))
+ getattr(inst, key), getattr(inst.path, key),
+ 'Key property "%s" does not match for package "%s#%s"!' %
+ (key, repo.repoid, pkg))
- @mark_tedious
- def test_get_resource_referents(self):
+ @swbase.test_with_repos('stable')
+ def test_repo_identity_names(self):
"""
- Test ReferenceNames for AvailableSAP.
+ Test ``AssociatorNames()`` call on ``LMI_ResourceForSoftwareIdentity``.
"""
- for repo in self.repodb:
- objpath = self.make_op(repo=repo)
- if not repo.pkg_count or repo.pkg_count > 10000:
- # do not test too big repositories
- continue
- refs = objpath.AvailableSAP.to_instance().associator_names(
- AssocClass=self.CLASS_NAME,
- Role="AvailableSAP",
- ResultRole="ManagedElement",
- ResultClass="LMI_SoftwareIdentity")
- if repo.pkg_count > 0:
- self.assertGreater(len(refs), 0,
- 'repository "%s" is missing software identities'
- % repo.name)
- for ref in refs:
- self.assertEqual(ref.namespace, 'root/cimv2')
- self.assertEqual(ref.classname, "LMI_SoftwareIdentity")
- self.assertEqual(ref.key_properties(), ["InstanceID"])
- self.assertTrue(
- ref.InstanceID.startswith("LMI:LMI_SoftwareIdentity:"))
-
- nevra_set = set(i.InstanceID for i in refs)
- # NOTE: installed packages might not be available
- for pkg, up in ((pkg, up) for pkg in self.dangerous_pkgs
- for up in (True, False)):
- nevra = 'LMI:LMI_SoftwareIdentity:'+pkg.get_nevra(
- newer=up, with_epoch="ALWAYS")
- reponame = getattr(pkg, 'up_repo' if up else 'repo')
- if reponame == repo.repoid:
- self.assertTrue(nevra in nevra_set,
- 'Missing nevra "%s" for repo "%s".' % (nevra,
- reponame))
-
- @mark_tedious
- def test_get_resource_referents_for_disabled_repo(self):
- """
- Test ReferenceNames for AvailableSAP, which is disabled.
- """
- for repo in self.repodb:
- if repo.status:
- continue # test only disabled repositories
- objpath = self.make_op(repo=repo)
- refs = objpath.AvailableSAP.to_instance().associator_names(
+ repo = self.get_repo('stable')
+ self.assertTrue(repo.status)
+ self.assertGreater(repo.pkg_count, 0)
+
+ objpath = self.make_op(repo=repo)
+ refs = objpath.AvailableSAP.to_instance().associator_names(
+ AssocClass=self.CLASS_NAME,
+ Role="AvailableSAP",
+ ResultRole="ManagedElement",
+ ResultClass="LMI_SoftwareIdentity")
+ self.assertEqual(len(refs), repo.pkg_count,
+ 'repository "%s" is missing software identities'
+ % repo.name)
+ for ref in refs:
+ self.assertEqual(ref.namespace, 'root/cimv2')
+ self.assertEqual(ref.classname, "LMI_SoftwareIdentity")
+ self.assertEqual(ref.key_properties(), ["InstanceID"])
+ self.assertTrue(
+ ref.InstanceID.startswith("LMI:LMI_SoftwareIdentity:"))
+
+ nevra_set = set(i.InstanceID for i in refs)
+ for pkg in repo.packages:
+ nevra = 'LMI:LMI_SoftwareIdentity:'+pkg.nevra
+ self.assertTrue(nevra in nevra_set,
+ 'Missing nevra "%s" for repo "%s".' % (nevra,
+ repo.repoid))
+ nevra_set.remove(nevra)
+ self.assertEqual(len(nevra_set), 0,
+ "all packages from repository have been listed")
+
+ @swbase.test_with_repos(stable=False)
+ def test_disabled_repo_identity_names(self):
+ """
+ Test ``AssociatorNames()`` call on ``LMI_ResourceForSoftwareIdentity``.
+ """
+ repo = self.get_repo('stable')
+ self.assertFalse(repo.status)
+ self.assertGreater(repo.pkg_count, 0)
+
+ objpath = self.make_op(repo=repo)
+ refs = objpath.AvailableSAP.to_instance().associator_names(
+ AssocClass=self.CLASS_NAME,
+ Role="AvailableSAP",
+ ResultRole="ManagedElement",
+ ResultClass="LMI_SoftwareIdentity")
+ self.assertEqual(len(refs), repo.pkg_count,
+ 'repository "%s" is missing software identities'
+ % repo.name)
+ for ref in refs:
+ self.assertEqual(ref.namespace, 'root/cimv2')
+ self.assertEqual(ref.classname, "LMI_SoftwareIdentity")
+ self.assertEqual(ref.key_properties(), ["InstanceID"])
+ self.assertTrue(
+ ref.InstanceID.startswith("LMI:LMI_SoftwareIdentity:"))
+
+ @swbase.test_with_repos('updates')
+ def test_repo_indentities(self):
+ """
+ Test ``Associators()`` call on ``LMI_ResourceForSoftwareIdentity``.
+ """
+ repo = self.get_repo('updates')
+ self.assertTrue(repo.status)
+ self.assertGreater(repo.pkg_count, 0)
+
+ objpath = self.make_op(repo=repo)
+ refs = objpath.AvailableSAP.to_instance().associators(
+ AssocClass=self.CLASS_NAME,
+ Role="AvailableSAP",
+ ResultRole="ManagedElement",
+ ResultClass="LMI_SoftwareIdentity")
+ if repo.pkg_count:
+ self.assertGreater(len(refs), 0,
+ 'no software identities associated to repo "%s"'
+ % repo.repoid)
+ for ref in refs:
+ self.assertEqual(ref.namespace, 'root/cimv2')
+ self.assertEqual(ref.classname, "LMI_SoftwareIdentity")
+ self.assertEqual(ref.path.key_properties(), ["InstanceID"])
+
+ nevra_set = set(i.ElementName for i in refs)
+ for pkg in repo.packages:
+ self.assertTrue(pkg.nevra in nevra_set,
+ 'Missing package "%s" in repo "%s".' % (pkg, repo.repoid))
+ nevra_set.remove(pkg.nevra)
+ self.assertEqual(len(nevra_set), 0,
+ "all packages from repository have been listed")
+
+ @swbase.test_with_repos('stable')
+ def test_get_pkg_repository_name(self):
+ """
+ Test ``AssociatorNames()`` call on ``LMI_SoftwareIdentity``.
+ """
+ repo = self.get_repo('stable')
+ self.assertTrue(repo.status)
+ for pkg in repo.packages:
+ objpath = self.make_op(pkg, repo)
+ inst = objpath.ManagedElement.to_instance()
+ self.assertNotEqual(inst, None,
+ "GetInstance() succeeds for package %s" % pkg)
+ refs = inst.associator_names(
AssocClass=self.CLASS_NAME,
- Role="AvailableSAP",
- ResultRole="ManagedElement",
- ResultClass="LMI_SoftwareIdentity")
- if repo.pkg_count:
- self.assertGreater(len(refs), 0,
- 'no software identities associated to repo "%s"' % repo.name)
- for ref in refs:
- self.assertEqual(ref.namespace, 'root/cimv2')
- self.assertEqual(ref.classname, "LMI_SoftwareIdentity")
- self.assertEqual(ref.key_properties(), ["InstanceID"])
- self.assertTrue(
- ref.InstanceID.startswith("LMI:LMI_SoftwareIdentity:"))
-
- @mark_dangerous
- def test_get_managed_element_referents(self):
- """
- Test ReferenceNames for SoftwareIdentity.
- """
- for pkg, up in ((pkg, up) for pkg in self.dangerous_pkgs
- for up in (True, False)):
- objpath = self.make_op(pkg, newer=up)
- refs = objpath.ManagedElement.to_instance().associator_names(
+ Role="ManagedElement",
+ ResultRole="AvailableSAP",
+ ResultClass="LMI_SoftwareIdentityResource")
+ self.assertEqual(1, len(refs),
+ 'No repo found for pkg "%s".' % pkg.nevra)
+ ref = refs[0]
+ self.assertEqual(ref.namespace, 'root/cimv2')
+ self.assertEqual(ref.classname, "LMI_SoftwareIdentityResource")
+ self.assertEqual(set(ref.key_properties()),
+ set(["SystemCreationClassName", "SystemName",
+ "Name", "CreationClassName"]))
+ self.assertEqual(ref.Name, pkg.repoid,
+ 'Repository name does not match for pkg "%s"'
+ % pkg.nevra)
+
+ @swbase.test_with_repos('updates')
+ @swbase.test_with_packages(**{
+ 'updates#pkg1' : True,
+ 'updates#pkg2' : True,
+ 'updates#pkg3' : False,
+ 'updates#pkg4' : False
+ })
+ def test_get_pkg_repository(self):
+ """
+ Test ``AssociatorNames()`` call on ``LMI_SoftwareIdentity``.
+
+ Try to get repository for installed and not installed repository.
+ """
+ repo = self.get_repo('updates')
+ self.assertTrue(repo.status)
+ for pkg in repo.packages:
+ objpath = self.make_op(pkg, repo)
+ inst = objpath.ManagedElement
+ self.assertNotEqual(inst, None,
+ "GetInstance() succeeds for package %s" % pkg)
+ refs = inst.to_instance().associators(
AssocClass=self.CLASS_NAME,
Role="ManagedElement",
ResultRole="AvailableSAP",
ResultClass="LMI_SoftwareIdentityResource")
self.assertEqual(1, len(refs),
- 'No repo found for pkg "%s".' % pkg.get_nevra(newer=up,
- with_epoch="ALWAYS"))
+ 'No repo found for pkg "%s".' % pkg.nevra)
ref = refs[0]
self.assertEqual(ref.namespace, 'root/cimv2')
self.assertEqual(ref.classname, "LMI_SoftwareIdentityResource")
- self.assertEqual(sorted(ref.key_properties()),
- sorted(["SystemCreationClassName", "SystemName",
+ self.assertEqual(set(ref.path.key_properties()),
+ set(["SystemCreationClassName", "SystemName",
"Name", "CreationClassName"]))
- self.assertEqual(
- getattr(pkg, 'up_repo' if up else 'repo'),
- ref.Name, 'Repository name does not match for pkg "%s"'%
- pkg.get_nevra(newer=up, with_epoch="ALWAYS"))
+ self.assertEqual(ref.Name, pkg.repoid,
+ 'Repository name does not match for pkg "%s"'
+ % pkg.nevra)
+ self.assertEqual(ref.ElementName, pkg.repoid)
+ self.assertEqual(ref.EnabledState, ENABLED_STATE_ENABLED)
def suite():
"""For unittest loaders."""
diff --git a/src/software/test/test_software_identity.py b/src/software/test/test_software_identity.py
index c4c5524..05ce92d 100644
--- a/src/software/test/test_software_identity.py
+++ b/src/software/test/test_software_identity.py
@@ -19,93 +19,228 @@
# Authors: Michal Minar <miminar@redhat.com>
#
"""
-Unit tests for LMI_SoftwareIdentity provider.
+Unit tests for ``LMI_SoftwareIdentity`` provider.
"""
from datetime import datetime, timedelta
import pywbem
import unittest
-from lmi.shell.LMIUtil import lmi_wrap_cim_instance_name
-from lmi.test.util import mark_dangerous
-import base
-import rpmcache
-import util
+from lmi.test.lmibase import enable_lmi_exceptions
+import package
+import swbase
-class TestSoftwareIdentity(base.SoftwareBaseTestCase): #pylint: disable=R0904
+class TestSoftwareIdentity(swbase.SwTestCase):
"""
- Basic cim operations test.
+ Basic cim operations test on ``LMI_SoftwareIdentity``.
"""
CLASS_NAME = "LMI_SoftwareIdentity"
KEYS = ("InstanceID", )
- def make_op(self, pkg, newer=True):
+ def make_op(self, pkg):
"""
- @return object path of SoftwareIdentity
+ :returns: Object path of ``LMI_SoftwareIdentity``
+ :rtype: :py:class:`lmi.shell.LMIInstanceName`
"""
+ if isinstance(pkg, package.Package):
+ nevra = pkg.nevra
+ else:
+ nevra = pkg
return self.cim_class.new_instance_name({
- "InstanceID" : 'LMI:LMI_SoftwareIdentity:'
- + pkg.get_nevra(newer, "ALWAYS")
+ "InstanceID" : 'LMI:LMI_SoftwareIdentity:' + nevra
})
- @mark_dangerous
- def test_get_instance(self):
- """
- Dangerous test, making sure, that properties are set correctly
- for installed and removed packages.
- """
- for pkg in self.dangerous_pkgs:
- if rpmcache.is_pkg_installed(pkg.name):
- rpmcache.remove_pkg(pkg.name)
- objpath = self.make_op(pkg)
- inst = objpath.to_instance()
- self.assertIsNone(inst.InstallDate)
- time_stamp = datetime.now(pywbem.cim_types.MinutesFromUTC(0)) \
- - timedelta(seconds=0.01)
- rpmcache.install_pkg(pkg)
- inst.refresh()
- self.assertIsNotNone(inst.InstallDate)
- self.assertGreater(inst.InstallDate.datetime, time_stamp)
-
- def test_get_instance_safe(self):
- """
- Tests GetInstance call on packages from our rpm cache.
- """
- for pkg in self.safe_pkgs:
- objpath = self.make_op(pkg)
- inst = objpath.to_instance()
- self.assertCIMNameEqual(inst.path, objpath,
- "Object paths should match for package %s"%pkg)
- for key in self.KEYS:
- self.assertIn(key, inst.properties(),
- "OP is missing \"%s\" key for package %s"%(key, pkg))
- self.assertIsInstance(inst.Caption, basestring)
- self.assertIsInstance(inst.Description, basestring)
- self.assertEqual(pkg.up_evra, inst.VersionString,
- "VersionString does not match evra for pkg %s" % pkg)
- self.assertTrue(inst.IsEntity)
- self.assertEqual(pkg.name, inst.Name,
- "Name does not match for pkg %s" % pkg)
- self.assertIsInstance(inst.Epoch, pywbem.Uint32,
- "Epoch does not match for pkg %s" % pkg)
- self.assertEqual(int(pkg.epoch), inst.Epoch)
- self.assertEqual(pkg.ver, inst.Version,
- "Version does not match for pkg %s" % pkg)
- self.assertEqual(pkg.rel, inst.Release,
- "Release does not match for pkg %s" % pkg)
- self.assertEqual(pkg.arch, inst.Architecture,
- "Architecture does not match for pkg %s" % pkg)
-
- # try to leave out epoch part
- if pkg.epoch == "0":
- op_no_epoch = objpath.copy()
- op_no_epoch.wrapped_object["SoftwareElementID"] = \
- util.make_nevra(pkg.name, pkg.up_epoch, pkg.up_ver,
- pkg.up_rel, pkg.arch, "NEVER")
- self.assertNotIn(":", op_no_epoch.SoftwareElementID)
- inst = op_no_epoch.to_instance()
- self.assertCIMNameIn(inst.path, (objpath, op_no_epoch))
+ def _check_package_instance(self, pkg, inst):
+ """
+ Check properties of ``LMI_SoftwareIdentity`` instance and its
+ correspondence to represented *pkg*.
+ """
+ self.assertNotEqual(inst, None,
+ "instance must exist for package %s" % pkg)
+ objpath = self.make_op(pkg)
+ self.assertCIMNameEqual(objpath, inst.path)
+ for key in self.KEYS:
+ self.assertEqual(getattr(inst, key), getattr(inst.path, key))
+ self.assertEqual(set(inst.path.key_properties()), set(self.KEYS))
+ self.assertEqual(pkg.name, inst.Name)
+
+ self.assertEqual(inst.InstanceID, 'LMI:LMI_SoftwareIdentity:'
+ + pkg.nevra)
+ self.assertEqual(inst.Caption, pkg.summary)
+ self.assertIsInstance(inst.Description, basestring)
+ self.assertEqual(inst.VersionString, pkg.evra,
+ "VersionString does not match evra for pkg %s" % pkg)
+ self.assertTrue(inst.IsEntity)
+ self.assertEqual(inst.Name, pkg.name,
+ "Name does not match for pkg %s" % pkg)
+ self.assertIsInstance(inst.Epoch, pywbem.Uint32,
+ "Epoch does not match for pkg %s" % pkg)
+ self.assertEqual(inst.Epoch, pkg.epoch)
+ self.assertEqual(inst.Version, pkg.ver,
+ "Version does not match for pkg %s" % pkg)
+ self.assertEqual(inst.Release, pkg.rel,
+ "Release does not match for pkg %s" % pkg)
+ self.assertEqual(inst.Architecture, pkg.arch,
+ "Architecture does not match for pkg %s" % pkg)
+ self.assertEqual(inst.ElementName, pkg.nevra,
+ "ElementName must match nevra of pkg %s" % pkg)
+ self.assertTrue(inst.InstanceID.endswith(inst.ElementName),
+ "InstanceID has package nevra at its end for pkg %s" % pkg)
+
+ if package.is_pkg_installed(pkg):
+ self.assertNotEqual(inst.InstallDate, None)
+ else:
+ self.assertEqual(inst.InstallDate, None)
+
+ @swbase.test_with_packages(**{'stable#pkg1' : False})
+ def test_get_instance_installed(self):
+ """
+ Test ``GetInstance()`` call on ``LMI_SoftwareIdentity`` with
+ installed package.
+ """
+ pkg = self.get_repo('stable')['pkg1']
+ package.remove_pkg(pkg.name)
+ package.install_pkg(pkg)
+ self.assertTrue(package.is_pkg_installed(pkg))
+
+ objpath = self.make_op(pkg)
+ inst = objpath.to_instance()
+ self.assertNotEqual(inst, None, "GetInstance is successful on %s" % pkg)
+ self.assertIsNotNone(inst.InstallDate)
+ time_stamp = datetime.now(pywbem.cim_types.MinutesFromUTC(0)) \
+ - timedelta(seconds=5)
+ self.assertGreater(inst.InstallDate.datetime, time_stamp)
+ self._check_package_instance(pkg, inst)
+
+ @swbase.test_with_repos('stable')
+ @swbase.test_with_packages(**{'stable#pkg2' : False})
+ def test_get_instance_not_installed(self):
+ """
+ Test ``GetInstance()`` call on ``LMI_SoftwareIdentity`` with not
+ installed package.
+ """
+ pkg = self.get_repo('stable')['pkg2']
+ self.assertFalse(package.is_pkg_installed(pkg))
+
+ self._check_package_instance(pkg, self.make_op(pkg).to_instance())
+
+ @swbase.test_with_packages('stable#pkg1', 'stable#pkg4')
+ def test_get_instance_without_epoch(self):
+ """
+ Test ``GetInstance()`` on ``LMI_SoftwareIdentity`` with epoch part
+ omitted.
+ """
+ pkg = self.get_repo('stable')['pkg1']
+ objpath = self.make_op(pkg)
+ objpath.wrapped_object['InstanceID'] = 'LMI:LMI_SoftwareIdentity:' \
+ + pkg.get_nevra('NEVER')
+ self.assertFalse('0:' in objpath.InstanceID)
+ inst = objpath.to_instance()
+ self.assertNotEqual(inst, None)
+ self.assertTrue('0:' in pkg.nevra)
+ self.assertTrue(inst.InstanceID.endswith(pkg.nevra))
+
+ @swbase.test_with_repos('stable')
+ @swbase.test_with_packages(**{'stable#pkg3' : False})
+ def test_refresh_after_install(self):
+ """
+ Test instance refresh of ``LMI_SoftwareIdentity`` after package
+ installation.
+ """
+ pkg = self.get_repo('stable')['pkg3']
+ self.assertFalse(package.is_pkg_installed(pkg))
+ inst = self.make_op(pkg).to_instance()
+ self.assertEqual(inst.InstallDate, None)
+ self._check_package_instance(pkg, inst)
+
+ # install it
+ package.install_pkg(pkg)
+ self.assertTrue(package.is_pkg_installed(pkg))
+ inst.refresh()
+ self.assertNotEqual(inst.InstallDate, None)
+ self._check_package_instance(pkg, inst)
+
+ @swbase.test_with_repos('stable')
+ @swbase.test_with_packages('stable#pkg4')
+ def test_refresh_after_uninstall(self):
+ """
+ Test instance refresh of ``LMI_SoftwareIdentity`` after package
+ uninstallation.
+ """
+ pkg = self.get_repo('stable')['pkg4']
+ self.assertTrue(package.is_pkg_installed(pkg))
+ inst = self.make_op(pkg).to_instance()
+ self.assertNotEqual(inst.InstallDate, None)
+ self._check_package_instance(pkg, inst)
+
+ # install it
+ package.remove_pkg(pkg.name)
+ self.assertFalse(package.is_pkg_installed(pkg))
+ inst.refresh()
+ self.assertEqual(inst.InstallDate, None)
+ self._check_package_instance(pkg, inst)
+
+ @enable_lmi_exceptions
+ @swbase.test_with_packages('stable#pkg3')
+ def test_get_instance_invalid(self):
+ """
+ Test ``GetInstance()`` call on invalid object path.
+ """
+ pkg = self.get_repo('stable')['pkg3']
+ # leave out arch part
+ nvr = '%s-%s-%s' % (pkg.name, pkg.ver, pkg.rel)
+ objpath = self.make_op(nvr)
+ self.assertRaisesCIM(pywbem.CIM_ERR_NOT_FOUND, objpath.to_instance)
+
+ # leave out version part
+ nra = '%s-%s.%s' % (pkg.name, pkg.rel, pkg.arch)
+ objpath = self.make_op(nra)
+ self.assertRaisesCIM(pywbem.CIM_ERR_NOT_FOUND, objpath.to_instance)
+
+ # leave out release part
+ nva = '%s-%s.%s' % (pkg.name, pkg.ver, pkg.arch)
+ objpath = self.make_op(nva)
+ self.assertRaisesCIM(pywbem.CIM_ERR_NOT_FOUND, objpath.to_instance)
+
+ # leave out name part
+ vra = '%s-%s.%s' % (pkg.ver, pkg.rel, pkg.arch)
+ objpath = self.make_op(vra)
+ self.assertRaisesCIM(pywbem.CIM_ERR_INVALID_PARAMETER,
+ objpath.to_instance)
+
+ # leave out just epoch
+ nvra = '%s-%s-%s.%s' % (pkg.name, pkg.ver, pkg.rel, pkg.arch)
+ objpath = self.make_op(nvra)
+ inst = objpath.to_instance()
+ self.assertNotEqual(inst, None)
+
+ @swbase.test_with_packages('misc#funny-version', 'misc#funny-release')
+ def test_get_instance_funny(self):
+ """
+ Test ``GetInstance()`` on package with quite odd nevra.
+ """
+ pkg = self.get_repo('misc')['funny-version']
+ self.assertTrue(package.is_pkg_installed(pkg))
+ objpath = self.make_op(pkg)
+ inst = objpath.to_instance()
+ self.assertNotEqual(inst, None, "GetInstance is successful on %s" % pkg)
+ self._check_package_instance(pkg, inst)
+
+ pkg = self.get_repo('misc')['funny-release']
+ self.assertTrue(package.is_pkg_installed(pkg))
+ objpath = self.make_op(pkg)
+ inst = objpath.to_instance()
+ self.assertNotEqual(inst, None, "GetInstance is successful on %s" % pkg)
+ self._check_package_instance(pkg, inst)
+
+ @enable_lmi_exceptions
+ def test_enum_instance_names(self):
+ """
+ Test ``EnumInstanceNames()`` call on ``LMI_SoftwareIdentity``.
+ """
+ self.assertRaisesCIM(pywbem.CIM_ERR_NOT_SUPPORTED,
+ self.cim_class.instance_names)
def suite():
"""For unittest loaders."""
diff --git a/src/software/test/test_software_identity_checks.py b/src/software/test/test_software_identity_checks.py
index 20d4fd1..c29fa47 100644
--- a/src/software/test/test_software_identity_checks.py
+++ b/src/software/test/test_software_identity_checks.py
@@ -19,17 +19,22 @@
# Authors: Michal Minar <miminar@redhat.com>
#
"""
-Unit tests for LMI_SoftwareIdentityChecks provider.
+Unit tests for ``LMI_SoftwareIdentityChecks`` provider.
"""
+import os
import pywbem
+import subprocess
import unittest
from lmi.test.lmibase import enable_lmi_exceptions
-import base
+import package
+import swbase
import util
-class TestSoftwareIdentityChecks(base.SoftwareBaseTestCase):
+SOFTWARE_ELEMENT_STATE_EXECUTABLE = 2
+
+class TestSoftwareIdentityChecks(swbase.SwTestCase):
"""
Basic cim operations test.
"""
@@ -37,104 +42,160 @@ class TestSoftwareIdentityChecks(base.SoftwareBaseTestCase):
CLASS_NAME = "LMI_SoftwareIdentityChecks"
KEYS = ("Check", "Element")
- @classmethod
- def needs_pkgdb_files(cls):
- return True
-
- def make_op(self, pkg, filepath, newer=True):
+ def make_op(self, pkg, filepath):
"""
- @return object path of SoftwareIdentityChecks association
+ :returns: Object path of ``LMI_SoftwareIdentityChecks``
+ :rtype: :py:class:`lmi.shell.LMIInstanceName`
"""
return self.cim_class.new_instance_name({
"Check" : self.ns.LMI_SoftwareIdentityFileCheck.new_instance_name({
"CheckID" : 'LMI:LMI_SoftwareIdentityFileCheck',
"Name" : filepath,
- "SoftwareElementID" : pkg.get_nevra(
- newer=newer, with_epoch="ALWAYS"),
- "SoftwareElementState" : pywbem.Uint16(2), #Executable
+ "SoftwareElementID" : pkg.nevra,
+ "SoftwareElementState" : SOFTWARE_ELEMENT_STATE_EXECUTABLE,
"TargetOperatingSystem" : util.get_target_operating_system(),
- "Version" : getattr(pkg, 'up_evra' if newer else 'evra')
+ "Version" : pkg.evra
}),
"Element" : self.ns.LMI_SoftwareIdentity.new_instance_name({
- "InstanceID" : 'LMI:LMI_SoftwareIdentity:'
- + pkg.get_nevra(newer=newer, with_epoch="ALWAYS")
+ "InstanceID" : 'LMI:LMI_SoftwareIdentity:' + pkg.nevra
})
})
- @unittest.skip("unrealiable test on random packages")
+ def setUp(self):
+ to_uninstall = set()
+ for repo in self.repodb.values():
+ for pkg in repo.packages:
+ to_uninstall.add(pkg.name)
+ to_uninstall = [p for p in to_uninstall if package.is_pkg_installed(p)]
+ if to_uninstall:
+ subprocess.call(['/usr/bin/rpm', '--quiet', '-e'] + to_uninstall)
+
+ @swbase.test_with_packages('stable#pkg1')
def test_get_instance(self):
"""
- Tests GetInstance call on packages from our rpm cache.
+ Test ``GetInstance()`` call on ``LMI_SoftwareIdentityChecks``.
+ """
+ pkg = self.get_repo('stable')['pkg1']
+ count = 0
+ for filepath in pkg:
+ objpath = self.make_op(pkg, filepath)
+ inst = objpath.to_instance()
+ self.assertNotEqual(inst, None,
+ "failed to get instance for %s:%s" % (pkg, filepath))
+ self.assertCIMNameEqual(inst.path, objpath)
+ for key in self.KEYS:
+ self.assertCIMNameEqual(
+ getattr(inst, key), getattr(inst.path, key))
+ count += 1
+ self.assertEqual(count, 7, "there are 7 files in pkg1")
+
+ @enable_lmi_exceptions
+ @swbase.test_with_packages('stable#pkg1')
+ def test_get_instance_invalid(self):
+ """
+ Test ``GetInstance()`` call on ``LMI_SoftwareIdentityChecks``
+ on not installed file.
"""
- for pkg in self.safe_pkgs:
- for filepath in self.pkgdb_files[pkg.name]:
- objpath = self.make_op(pkg, filepath)
- inst = objpath.to_instance()
- self.assertCIMNameEqual(inst.path, objpath,
- "Object paths should match for package %s"%pkg)
- for key in self.KEYS:
- self.assertIn(key, inst.properties(),
- "OP is missing \"%s\" key for package %s"%(key, pkg))
- if key == "SoftwareElementID":
- setattr(inst, key, getattr(inst, key)[:-1])
- self.assertCIMNameEqual(
- getattr(inst, key), getattr(inst.path, key))
- self.assertCIMNameEqual(objpath, inst.path,
- "Object paths should match for package %s"%pkg)
+ pkg = self.get_repo('stable')['pkg1']
+ objpath = self.make_op(pkg, '/not-installed-file')
+ self.assertRaisesCIM(pywbem.CIM_ERR_NOT_FOUND, objpath.to_instance)
@enable_lmi_exceptions
def test_enum_instance_names(self):
"""
- Tests EnumInstances call on identity checks - should not be supported.
+ Test ``EnumInstanceNames()`` call on ``LMI_SoftwareIdentityChecks``.
+ Should not be supported.
"""
self.assertRaisesCIM(pywbem.CIM_ERR_NOT_SUPPORTED,
self.cim_class.instance_names)
- def test_get_identity_referents(self):
+ @swbase.test_with_packages('stable#pkg3')
+ def test_package_file_checks(self):
+ """
+ Try to get file checks associated with package.
+ """
+ pkg = self.get_repo('stable')['pkg3']
+ filepath = '/usr/share/openlmi-sw-test-pkg3'
+ objpath = self.make_op(pkg, filepath)
+ inst = objpath.Element.to_instance()
+ self.assertNotEqual(inst, None,
+ "get instance on %s:%s succeeds" % (pkg, filepath))
+ refs = inst.associators(
+ AssocClass=self.CLASS_NAME,
+ Role="Element",
+ ResultRole="Check",
+ ResultClass="LMI_SoftwareIdentityFileCheck")
+ self.assertEqual(len(refs), len(pkg))
+ self.assertEqual(len(refs), 10, "there are 10 files in %s" % pkg.name)
+ pkg_names = set(pkg.files)
+ for ref in refs:
+ self.assertIn(ref.Name, set(r.Name for r in refs))
+ self.assertEqual(sorted(ref.path.key_properties()),
+ ["CheckID", "Name", "SoftwareElementID",
+ "SoftwareElementState",
+ "TargetOperatingSystem", "Version"])
+ self.assertEqual(ref.SoftwareElementID, pkg.nevra)
+ self.assertIn(ref.Name, pkg_names)
+ self.assertEqual(ref.FailedFlags, [])
+ pkg_names.remove(ref.Name)
+ self.assertEqual(len(pkg_names), 0)
+
+ @swbase.test_with_packages('stable#pkg3')
+ def test_package_file_check_names(self):
"""
- Test AssociatorNames for Element(SoftwareIdentity).
+ Try to get file check names associated with package.
"""
- for pkg in self.safe_pkgs:
- files = self.pkgdb_files[pkg.name]
- if not files:
- # package may not have any file
- continue
- objpath = self.make_op(pkg, files[0])
- refs = objpath.Element.to_instance().associator_names(
- AssocClass=self.CLASS_NAME,
- Role="Element",
- ResultRole="Check",
- ResultClass="LMI_SoftwareIdentityFileCheck")
- for ref in refs:
- self.assertEqual(ref.namespace, 'root/cimv2')
- self.assertEqual(ref.classname,
- "LMI_SoftwareIdentityFileCheck")
- self.assertIn(ref.Name, set(r.Name for r in refs))
- self.assertEqual(sorted(ref.key_properties()),
- ["CheckID", "Name", "SoftwareElementID",
- "SoftwareElementState",
- "TargetOperatingSystem", "Version"])
- self.assertEqual(
- pkg.get_nevra(newer=True, with_epoch='ALWAYS'),
- ref.SoftwareElementID)
-
- @unittest.skip("unrealiable test on random packages")
- def test_get_check_referents(self):
+ pkg = self.get_repo('stable')['pkg3']
+ objpath = self.make_op(pkg, '/usr/share/openlmi-sw-test-pkg3')
+ refs = objpath.Element.to_instance().associator_names(
+ AssocClass=self.CLASS_NAME,
+ Role="Element",
+ ResultRole="Check",
+ ResultClass="LMI_SoftwareIdentityFileCheck")
+ self.assertEqual(len(refs), len(pkg))
+ self.assertEqual(len(refs), 10, "there are 10 files in %s" % pkg.name)
+ pkg_names = set(pkg.files)
+ for ref in refs:
+ self.assertIn(ref.Name, set(r.Name for r in refs))
+ self.assertEqual(sorted(ref.key_properties()),
+ ["CheckID", "Name", "SoftwareElementID",
+ "SoftwareElementState",
+ "TargetOperatingSystem", "Version"])
+ self.assertEqual(ref.SoftwareElementID, pkg.nevra)
+ self.assertIn(ref.Name, pkg_names)
+ pkg_names.remove(ref.Name)
+ self.assertEqual(len(pkg_names), 0)
+
+ @swbase.test_with_packages('stable#pkg3')
+ def test_file_check_package(self):
"""
- Test AssociatorNames for Check(SoftwareIdentityFileCheck).
+ Try to get package assocatied to file check.
"""
- for pkg in self.safe_pkgs:
- files = self.pkgdb_files[pkg.name]
- for filepath in files:
- objpath = self.make_op(pkg, filepath)
- refs = objpath.Check.to_instance().associator_names(
- AssocClass=self.CLASS_NAME,
- Role="Check",
- ResultRole="Element",
- ResultClass="LMI_SoftwareIdentity")
- self.assertEqual(len(refs), 1)
- ref = refs[0]
- self.assertCIMNameEqual(objpath.Element, ref)
+ pkg = self.get_repo('stable')['pkg3']
+ filepath = '/usr/share/openlmi-sw-test-pkg3/README'
+ objpath = self.make_op(pkg, filepath)
+ inst = objpath.Check.to_instance()
+ self.assertNotEqual(inst, None,
+ "get instance for %s:%s succeeds" % (pkg, filepath))
+ refs = inst.associators(
+ AssocClass=self.CLASS_NAME,
+ Role="Check",
+ ResultRole="Element",
+ ResultClass="LMI_SoftwareIdentity")
+ self.assertEqual(len(refs), 1)
+ ref = refs[0]
+ self.assertCIMNameEqual(ref.path, objpath.Element)
+ self.assertEqual(ref.ElementName, pkg.nevra)
+ self.assertEqual(ref.VersionString, pkg.evra)
+
+ # try with removed file
+ os.remove(filepath)
+ refs = objpath.Check.to_instance().associators(
+ AssocClass=self.CLASS_NAME,
+ Role="Check",
+ ResultRole="Element",
+ ResultClass="LMI_SoftwareIdentity")
+ self.assertEqual(len(refs), 1)
def suite():
"""For unittest loaders."""
diff --git a/src/software/test/test_software_identity_file_check.py b/src/software/test/test_software_identity_file_check.py
index 485a970..4783076 100644
--- a/src/software/test/test_software_identity_file_check.py
+++ b/src/software/test/test_software_identity_file_check.py
@@ -19,25 +19,77 @@
# Authors: Michal Minar <miminar@redhat.com>
#
"""
-Unit tests for LMI_SoftwareIdentity provider.
+Unit tests for ``LMI_SoftwareIdentityFileCheck`` provider.
"""
+import shutil
import os
import pywbem
import re
+import stat
import subprocess
import unittest
from lmi.test.lmibase import enable_lmi_exceptions
-from lmi.test.util import mark_dangerous
-import base
-import rpmcache
+import package
+import swbase
import util
RE_CHECKSUM = re.compile(r'^([0-9a-fA-F]+)\s+.*')
-class TestSoftwareIdentityFileCheck(
- base.SoftwareBaseTestCase): #pylint: disable=R0904
+SOFTWARE_ELEMENT_STATE_EXECUTABLE = 2
+FILE_TYPE_UNKNOWN = 0
+FILE_TYPE_FILE = 1
+FILE_TYPE_DIRECTORY = 2
+FILE_TYPE_SYMLINK = 3
+FILE_TYPE_FIFO = 4
+FILE_TYPE_CHARDEV = 5
+FILE_TYPE_BLOCKDEV = 6
+FAILED_FLAGS_EXISTENCE = 0
+FAILED_FLAGS_SIZE = 1
+FAILED_FLAGS_MODE = 2
+FAILED_FLAGS_CHECKSUM = 3
+FAILED_FLAGS_DEVNUM = 4
+FAILED_FLAGS_TARGET = 5
+FAILED_FLAGS_UID = 6
+FAILED_FLAGS_GID = 7
+FAILED_FLAGS_MTIME = 8
+FILE_MODE_XOTH = 0
+FILE_MODE_WOTH = 1
+FILE_MODE_ROTH = 2
+FILE_MODE_XGRP = 3
+FILE_MODE_WGRP = 4
+FILE_MODE_RGRP = 5
+FILE_MODE_XUSR = 6
+FILE_MODE_WUSR = 7
+FILE_MODE_RUSR = 8
+FILE_MODE_SVTX = 9 # sticky bit
+FILE_MODE_SGID = 10
+FILE_MODE_SUID = 11
+INVOKE_SATISFIED = 0
+INVOKE_NOT_SATISFIED = 2
+
+# maps algorithm id to the shell command
+HASH_COMMAND = {
+ 1 : "md5sum",
+ 2 : "sha1sum",
+ 8 : "sha256sum",
+ 9 : "sha384sum",
+ 10 : "sha512sum",
+ 11 : "sha224sum"
+}
+
+# maps algorithm id to the length of digest string
+HASH_DIGEST_LENGTH = {
+ 1 : 32, #MD5
+ 2 : 40, #SHA1
+ 8 : 64, #SHA256
+ 9 : 96, #SHA384
+ 10 : 128, #SHA512
+ 11 : 56 #SHA224
+}
+
+class TestSoftwareIdentityFileCheck(swbase.SwTestCase):
"""
Basic cim operations test.
"""
@@ -45,103 +97,84 @@ class TestSoftwareIdentityFileCheck(
CLASS_NAME = "LMI_SoftwareIdentityFileCheck"
KEYS = ("CheckID", "Name", "SoftwareElementID", "SoftwareElementState")
- # maps algorithm id to the shell command
- HASH_COMMAND = {
- 1 : "md5sum",
- 2 : "sha1sum",
- 8 : "sha256sum",
- 9 : "sha384sum",
- 10 : "sha512sum",
- 11 : "sha224sum"
- }
-
- # maps algorithm id to the length of digest string
- HASH_DIGEST_LENGTH = {
- 1 : 32, #MD5
- 2 : 40, #SHA1
- 8 : 64, #SHA256
- 9 : 96, #SHA384
- 10 : 128, #SHA512
- 11 : 56 #SHA224
- }
-
- @classmethod
- def needs_pkgdb_files(cls):
- return True
-
def make_checksum_str(self, csumnum, filename):
"""
- @return checksum of installed file
+ :returns: Checksum of installed file.
+ :rtype: string
"""
return RE_CHECKSUM.match(subprocess.check_output([
- self.HASH_COMMAND[csumnum], filename])).group(1).lower()
+ HASH_COMMAND[csumnum], filename])).group(1).lower()
- def make_op(self, pkg, file_name, newer=True):
+ def make_op(self, pkg, file_name):
"""
- :returns: object path of `LMI_SoftwareIdentityFileCheck`
+ :returns: Object path of ``LMI_SoftwareIdentityFileCheck``
+ :rtype: :py:class:`lmi.shell.LMIInstanceName`
"""
return self.cim_class.new_instance_name({
"CheckID" : 'LMI:LMI_SoftwareIdentityFileCheck',
"Name" : file_name,
- "SoftwareElementID" : pkg.get_nevra(
- newer=newer, with_epoch="ALWAYS"),
- "SoftwareElementState" : pywbem.Uint16(2), #Executable
+ "SoftwareElementID" : pkg.nevra,
+ "SoftwareElementState" : pywbem.Uint16(
+ SOFTWARE_ELEMENT_STATE_EXECUTABLE),
"TargetOperatingSystem" : util.get_target_operating_system(),
- "Version" : getattr(pkg, 'up_evra' if newer else 'evra')
+ "Version" : pkg.evra
})
- def do_check_symlink(self, pkg, filepath, inst, safe=False):
+ def setUp(self):
+ to_uninstall = set()
+ for repo in self.repodb.values():
+ for pkg in repo.packages:
+ to_uninstall.add(pkg.name)
+ to_uninstall = [p for p in to_uninstall if package.is_pkg_installed(p)]
+ if to_uninstall:
+ subprocess.call(['/usr/bin/rpm', '--quiet', '-e'] + to_uninstall)
+
+ def do_check_symlink(self, pkg, filepath, inst):
"""
- Assert some details about symlink.
- :param safe: (``bool``) If True, no modification is done to file.
+ Check symbolink link.
"""
target = os.readlink(filepath)
stats = os.lstat(filepath)
- self.assertEqual(pywbem.Uint16(3), inst.FileType,
- "Unexpected file type of symlink for %s:%s"%(
- pkg.name, filepath))
- self.assertEqual(stats.st_uid, inst.UserID,
- "Unexpected uid of symlink for %s:%s"%(pkg.name, filepath))
- self.assertEqual(stats.st_gid, inst.GroupID,
- "Unexpected gid of symlink for %s:%s"%(pkg.name, filepath))
- self.assertEqual(stats.st_mode, inst.FileMode,
- "Unexpected mode of symlink for %s:%s" %(pkg.name, filepath))
- self.assertEqual(stats.st_size, inst.FileSize,
- "Unexpected size of symlink for %s:%s"%(pkg.name, filepath))
- self.assertEqual(target, inst.LinkTarget,
- "Unexpected size of symlink for %s:%s"%(pkg.name, filepath))
- self.assertEqual(int(stats.st_mtime), inst.LastModificationTime,
- "Unexpected mtime of symlink for %s:%s"%(pkg.name, filepath))
+ self.assertEqual(inst.FileType, pywbem.Uint16(FILE_TYPE_SYMLINK),
+ "Unexpected file type of symlink for %s:%s"
+ % (pkg.name, filepath))
+ self.assertEqual(inst.UserID, stats.st_uid,
+ "Unexpected uid of symlink for %s:%s" % (pkg.name, filepath))
+ self.assertEqual(inst.GroupID, stats.st_gid,
+ "Unexpected gid of symlink for %s:%s" % (pkg.name, filepath))
+ self.assertEqual(inst.FileMode, stats.st_mode,
+ "Unexpected mode of symlink for %s:%s" % (pkg.name, filepath))
+ self.assertEqual(inst.FileSize, stats.st_size,
+ "Unexpected size of symlink for %s:%s" % (pkg.name, filepath))
+ self.assertEqual(inst.LinkTarget, target,
+ "Unexpected target of symlink for %s:%s" % (pkg.name, filepath))
+ self.assertEqual(inst.LastModificationTime, int(stats.st_mtime),
+ "Unexpected mtime of symlink for %s:%s" % (pkg.name, filepath))
self.assertIsNone(inst.Checksum,
- "Checksum should be None for symlink %s:%s"%(
- pkg.name, filepath))
+ "Checksum should be None for symlink %s:%s"
+ % (pkg.name, filepath))
self.assertIsNone(inst.FileChecksum,
- "FileChecksum should be None for symlink %s:%s"%(
- pkg.name, filepath))
+ "FileChecksum should be None for symlink %s:%s"
+ % (pkg.name, filepath))
self.assertIsNone(inst.MD5Checksum,
- "MD5Checksum should be None for symlink %s:%s"%(
- pkg.name, filepath))
-
- if safe:
- return
+ "MD5Checksum should be None for symlink %s:%s"
+ % (pkg.name, filepath))
# modify owner
prev_user = inst.UserID
os.lchown(filepath, stats.st_uid + 1, -1)
inst.refresh()
- self.assertEqual(inst.UserIDOriginal + 1, inst.UserID,
+ self.assertEqual(inst.UserID, inst.UserIDOriginal + 1,
"Unexpected uid of modified symlink for %s:%s"%(
pkg.name, filepath))
- self.assertEqual(prev_user + 1, inst.UserID,
+ self.assertEqual(inst.UserID, prev_user + 1,
"Unexpected uid of modified symlink for %s:%s"%(
pkg.name, filepath))
- self.assertEqual(stats.st_gid, inst.GroupID,
+ self.assertEqual(inst.GroupID, stats.st_gid,
"Unexpected gid of modified symlink for %s:%s"%(
pkg.name, filepath))
- self.assertEqual([
- pywbem.Uint16(7) #UserID
- ], inst.FailedFlags)
+ self.assertEqual(inst.FailedFlags, [FAILED_FLAGS_UID])
# modify link_target
os.remove(filepath)
@@ -150,95 +183,87 @@ class TestSoftwareIdentityFileCheck(
os.lchown(filepath, inst.UserIDOriginal, inst.GroupIDOriginal)
inst.refresh()
- self.assertEqual(set([
- pywbem.Uint16(1), #FileSize
- pywbem.Uint16(6), #LinkTarget
- ]), set(inst.FailedFlags))
- self.assertGreater(len(inst.LinkTarget),
- len(inst.LinkTargetOriginal))
+ self.assertEqual(set(inst.FailedFlags), set([FAILED_FLAGS_TARGET]))
+ self.assertGreater(len(inst.LinkTarget), len(inst.LinkTargetOriginal))
self.assertTrue(inst.FileExists,
"File %s:%s should exist"%(pkg.name, filepath))
- self.assertEqual(inst.FileTypeOriginal, inst.FileType,
+ self.assertEqual(inst.FileType, inst.FileTypeOriginal,
"File type should match for symlink %s:%s"%(pkg.name, filepath))
self.assertNotEqual(inst.FileSizeOriginal, inst.FileSize,
"File size should not match for symlink %s:%s"%(
pkg.name, filepath))
- self.assertEqual(inst.FileModeOriginal, inst.FileMode,
+ self.assertEqual(inst.FileMode, inst.FileModeOriginal,
"File mode should match for symlink %s:%s"%(pkg.name, filepath))
- self.assertEqual(lt_modif, inst.LinkTarget,
+ self.assertEqual(inst.LinkTarget, lt_modif,
"Link target should match modified path %s:%s"%(
pkg.name, filepath))
- self.assertNotEqual(inst.LinkTargetOriginal, inst.ListTarget,
+ self.assertNotEqual(inst.LinkTargetOriginal, inst.LinkTarget,
"Link target should not match for symlink %s:%s"%(
pkg.name, filepath))
- self.assertEqual(inst.UserIDOriginal, inst.UserID,
+ self.assertEqual(inst.UserID, inst.UserIDOriginal,
"File uid should match for symlink %s:%s"%(pkg.name, filepath))
- self.assertEqual(inst.GroupIDOriginal, inst.GroupID,
+ self.assertEqual(inst.GroupID, inst.GroupIDOriginal,
"File gid should match for symlink %s:%s"%(pkg.name, filepath))
def do_check_directory(self, pkg, filepath, inst):
"""
- Assert some details about directory.
+ Check directory.
"""
stats = os.lstat(filepath)
- self.assertEqual(pywbem.Uint16(2), inst.FileType,
+ self.assertEqual(inst.FileType, pywbem.Uint16(FILE_TYPE_DIRECTORY),
"Unexpected type for directory %s:%s"%(pkg.name, filepath))
- self.assertEqual(stats.st_uid, inst.UserID,
+ self.assertEqual(inst.UserID, stats.st_uid,
"Unexpected uid for directory %s:%s"%(pkg.name, filepath))
- self.assertEqual(stats.st_gid, inst.GroupID,
+ self.assertEqual(inst.GroupID, stats.st_gid,
"Unexpected gid for directory %s:%s"%(pkg.name, filepath))
- self.assertEqual(stats.st_mode, inst.FileMode,
+ self.assertEqual(inst.FileMode, stats.st_mode,
"Unexpected mode for directory %s:%s"%(pkg.name, filepath))
- self.assertEqual(stats.st_size, inst.FileSize,
+ self.assertEqual(inst.FileSize, stats.st_size,
"Unexpected size for directory %s:%s"%(pkg.name, filepath))
self.assertIs(inst.LinkTarget, None)
self.assertIsNone(inst.FileChecksum,
"Unexpected checksum for directory %s:%s"%(pkg.name, filepath))
- self.assertEqual(int(stats.st_mtime), inst.LastModificationTime,
+ self.assertEqual(inst.LastModificationTime, int(stats.st_mtime),
"Unexpected mtime for directory %s:%s"%(pkg.name, filepath))
- self.assertEqual([], inst.FailedFlags)
+ self.assertEqual(inst.FailedFlags, [])
- def do_check_file(self, pkg, filepath, inst, safe=False):
+ def do_check_file(self, pkg, filepath, inst):
"""
- Assert some details about regular file.
- :param safe: (``bool``) If True, no modification is done to file.
+ Check regular file.
"""
stats = os.lstat(filepath)
- self.assertEqual(pywbem.Uint16(1), inst.FileType,
+ self.assertEqual(inst.FileType, pywbem.Uint16(FILE_TYPE_FILE),
"Unexpected file type for %s:%s"%(pkg.name, filepath))
- self.assertEqual(stats.st_uid, inst.UserID,
+ self.assertEqual(inst.UserID, stats.st_uid,
"Unexpected file uid for %s:%s"%(pkg.name, filepath))
- self.assertEqual(stats.st_gid, inst.GroupID,
+ self.assertEqual(inst.GroupID, stats.st_gid,
"Unexpected gid for regular file %s:%s"%(pkg.name, filepath))
- self.assertEqual(stats.st_mode, inst.FileMode,
+ self.assertEqual(inst.FileMode, stats.st_mode,
"Unexpected mode for regular file %s:%s"%(pkg.name, filepath))
- self.assertEqual(stats.st_size, inst.FileSize,
+ self.assertEqual(inst.FileSize, stats.st_size,
"Unexpected size for regular file %s:%s"%(pkg.name, filepath))
self.assertIs(inst.LinkTarget, None)
csum = self.make_checksum_str(inst.ChecksumType, filepath)
- self.assertEqual(csum, inst.FileChecksum.lower(),
+ self.assertEqual(inst.FileChecksum.lower(), csum,
"Unexpected checksum for regular file %s:%s"%(pkg.name, filepath))
self.assertEqual(inst.LastModificationTime,
inst.LastModificationTimeOriginal,
"Unexpected mtime for regular file %s:%s"%(pkg.name, filepath))
- self.assertEqual(int(stats.st_mtime), inst.LastModificationTime,
+ self.assertEqual(inst.LastModificationTime, int(stats.st_mtime),
"Unexpected mtime for regular file %s:%s"%(pkg.name, filepath))
- if safe:
- return
-
# make it longer
with open(filepath, "a+") as fobj:
fobj.write("data\n")
inst.refresh()
- self.assertEqual(set([
- pywbem.Uint16(1), #FileSize
- pywbem.Uint16(3), #Checksum
- pywbem.Uint16(9), #LastModificationTime
- ]), set(inst.FailedFlags))
+ self.assertEqual(set(inst.FailedFlags), set([
+ FAILED_FLAGS_SIZE,
+ FAILED_FLAGS_CHECKSUM,
+ FAILED_FLAGS_MTIME,
+ ]))
self.assertGreater(inst.FileSize, inst.FileSizeOriginal,
"File size should be greater, then expected for regular file"
@@ -266,15 +291,14 @@ class TestSoftwareIdentityFileCheck(
pkg.name, filepath))
self.assertNotEqual(inst.FileTypeOriginal, inst.FileType,
"File type should not match for %s:%s"%(pkg.name, filepath))
- self.assertEqual(pywbem.Uint16(3), inst.FileType,
+ self.assertEqual(inst.FileType, pywbem.Uint16(FILE_TYPE_SYMLINK),
"File type should match for %s:%s"%(pkg.name, filepath))
- self.assertIn(pywbem.Uint16(2), #FileMode
- inst.FailedFlags)
+ self.assertIn(pywbem.Uint16(FAILED_FLAGS_MODE), inst.FailedFlags)
# remove it
os.remove(filepath)
inst.refresh()
- self.assertEqual(inst.LinkTargetOriginal, inst.LinkTarget,
+ self.assertEqual(inst.LinkTarget, inst.LinkTargetOriginal,
"Link target does not match for regular file %s:%s"%(
pkg.name, filepath))
self.assertNotEqual(inst.FileSizeOriginal, inst.FileSize,
@@ -286,20 +310,58 @@ class TestSoftwareIdentityFileCheck(
self.assertIsNone(inst.FileMode)
self.assertIsNone(inst.UserID)
self.assertIsNone(inst.GroupID)
- self.assertEqual([
- pywbem.Uint16(0), #exists
- ], inst.FileExists)
+ self.assertFalse(inst.FileExists)
+ self.assertEqual(inst.FailedFlags, [FAILED_FLAGS_EXISTENCE])
- def check_filepath(self, pkg, filepath, safe=False):
+ def do_check_dev(self, pkg, filepath, inst):
+ """
+ Check device file.
+ """
+ stats = os.lstat(filepath)
+
+ is_block = stat.S_ISBLK(stats.st_mode)
+ self.assertEqual(inst.FileType, pywbem.Uint16(
+ FILE_TYPE_BLOCKDEV if is_block else FILE_TYPE_CHARDEV),
+ "Unexpected file type of dev file for %s:%s"
+ % (pkg.name, filepath))
+ self.assertEqual(inst.UserID, stats.st_uid,
+ "Unexpected uid of dev file for %s:%s" % (pkg.name, filepath))
+ self.assertEqual(inst.GroupID, stats.st_gid,
+ "Unexpected gid of dev file for %s:%s" % (pkg.name, filepath))
+ self.assertEqual(inst.FileMode, stats.st_mode,
+ "Unexpected mode of dev file for %s:%s" % (pkg.name, filepath))
+ self.assertEqual(inst.FileSize, stats.st_size,
+ "Unexpected size of dev file for %s:%s" % (pkg.name, filepath))
+ self.assertEqual(inst.LinkTarget, None)
+ self.assertEqual(inst.LastModificationTime, int(stats.st_mtime),
+ "Unexpected mtime of dev file for %s:%s" % (pkg.name, filepath))
+ self.assertIsNone(inst.Checksum,
+ "Checksum should be None for dev file %s:%s"
+ % (pkg.name, filepath))
+ self.assertIsNone(inst.FileChecksum,
+ "FileChecksum should be None for dev file %s:%s"
+ % (pkg.name, filepath))
+ self.assertIsNone(inst.MD5Checksum,
+ "MD5Checksum should be None for dev file %s:%s"
+ % (pkg.name, filepath))
+
+ os.remove(filepath)
+ subprocess.call(['/usr/bin/mknod', filepath, 'b' if is_block else 'c',
+ str(os.major(stats.st_rdev) + 1), str(os.minor(stats.st_rdev) + 1)])
+ self.assertTrue(os.path.exists(filepath))
+ inst.refresh()
+ self.assertEqual(set(inst.FailedFlags), set([FAILED_FLAGS_DEVNUM]))
+ self.assertEqual(inst.FileMode, stats.st_mode)
+
+ def check_filepath(self, pkg, filepath):
"""
Make a check of particular file of package.
All files are expected to have no flaw.
-
- :param safe: (``bool``) says, whether modifications to file can be done
"""
objpath = self.make_op(pkg, filepath)
inst = objpath.to_instance()
- self.assertCIMNameEqual(objpath, inst.path,
+ self.assertNotEqual(inst, None)
+ self.assertCIMNameEqual(inst.path, objpath,
msg="Object paths of instance must match for %s:%s"%(
pkg.name, filepath))
for key in self.KEYS:
@@ -312,8 +374,9 @@ class TestSoftwareIdentityFileCheck(
self.assertTrue(inst.FileExists,
"File %s:%s must exist"%(pkg.name, filepath))
- self.assertEqual(0, len(inst.FailedFlags),
- "FailedFlags must be empty")
+ self.assertEqual(len(inst.FailedFlags), 0,
+ "FailedFlags must be empty for %s:%s, not: %s" % (
+ str(pkg), filepath, inst.FailedFlags))
for prop in ( "FileType", "UserID", "GroupID"
, "FileMode", "FileSize", "LinkTarget"
@@ -327,91 +390,435 @@ class TestSoftwareIdentityFileCheck(
getattr(inst, prop+"Original"), getattr(inst, prop),
"%s should match for %s:%s"%(prop, pkg.name, filepath))
if os.path.islink(filepath):
- self.do_check_symlink(pkg, filepath, inst, safe=safe)
+ self.do_check_symlink(pkg, filepath, inst)
elif os.path.isdir(filepath):
self.do_check_directory(pkg, filepath, inst)
elif os.path.isfile(filepath):
- self.do_check_file(pkg, filepath, inst, safe=safe)
+ self.do_check_file(pkg, filepath, inst)
- @unittest.skip("not reliable test for random packages")
- def test_get_instance_safe(self):
+ @swbase.test_with_packages('stable#pkg1')
+ def test_get_directory(self):
"""
- Tests GetInstance call.
+ Test ``GetInstance()`` call on ``LMI_SoftwareIdentityFileCheck`` with
+ directory.
"""
- for pkg in self.safe_pkgs:
- files = self.pkgdb_files[pkg.name]
- for filepath in files:
- self.check_filepath(pkg, filepath, safe=True)
+ pkg = self.get_repo('stable')['pkg1']
+ filepath = '/usr/share/openlmi-sw-test-pkg1'
+ objpath = self.make_op(pkg, filepath)
+ inst = objpath.to_instance()
+ self.assertEqual(inst.FileType, FILE_TYPE_DIRECTORY)
+ self.check_filepath(pkg, filepath)
+
+ @swbase.test_with_packages('stable#pkg1')
+ def test_check_config(self):
+ """
+ Test ``GetInstance()`` call on ``LMI_SoftwareIdentityFileCheck`` with
+ config file..
+ """
+ pkg = self.get_repo('stable')['pkg1']
+ filepath = '/etc/openlmi/software/test/dummy_config.cfg'
+ objpath = self.make_op(pkg, filepath)
+ inst = objpath.to_instance()
+ self.assertEqual(inst.FileType, FILE_TYPE_FILE)
+ self.check_filepath(pkg, filepath)
+
+ @enable_lmi_exceptions
+ @swbase.test_with_packages('stable#pkg1', 'stable#pkg2')
+ def test_get_instance_invalid(self):
+ """
+ Test ``GetInstance()`` call on ``LMI_SoftwareIdentityFileCheck`` with
+ file not belonging to particular package.
+ """
+ stable = self.get_repo('stable')
+ pkg1 = stable['pkg1']
+ pkg2 = stable['pkg2']
+ fp1 = '/etc/openlmi/software/test/dummy_config.cfg'
+ fp2 = '/usr/share/openlmi-sw-test-pkg2'
+ objpath1 = self.make_op(pkg1, fp2)
+ self.assertRaisesCIM(pywbem.CIM_ERR_NOT_FOUND, objpath1.to_instance)
+ objpath2 = self.make_op(pkg2, fp1)
+ self.assertRaisesCIM(pywbem.CIM_ERR_NOT_FOUND, objpath2.to_instance)
+ objpath1.wrapped_object["Name"] = fp1
+ self.assertNotEqual(objpath1.to_instance(), None)
+ objpath2.wrapped_object["Name"] = fp2
+ self.assertNotEqual(objpath2.to_instance(), None)
+
+ @swbase.test_with_packages('stable#pkg2')
+ def test_check_symlink(self):
+ """
+ Test ``GetInstance()`` call on ``LMI_SoftwareIdentityFileCheck`` with
+ symbolic link.
+ """
+ pkg = self.get_repo('stable')['pkg2']
+ filepath = '/usr/share/openlmi-sw-test-pkg2/symlinks/absolute'
+ objpath = self.make_op(pkg, filepath)
+ inst = objpath.to_instance()
+ self.assertEqual(inst.FileType, FILE_TYPE_SYMLINK)
+ self.check_filepath(pkg, filepath)
+
+ @swbase.test_with_packages('stable#pkg3')
+ def test_check_chardev(self):
+ """
+ Test ``GetInstance()`` call on ``LMI_SoftwareIdentityFileCheck`` with
+ block and character device files.
+ """
+ pkg = self.get_repo('stable')['pkg3']
+ filepath = '/usr/share/openlmi-sw-test-pkg3/devs/char12'
+ objpath = self.make_op(pkg, filepath)
+ inst = objpath.to_instance()
+ self.assertNotEqual(inst, None)
+ self.assertEqual(inst.FileType, FILE_TYPE_CHARDEV)
+ self.check_filepath(pkg, filepath)
+ self.do_check_dev(pkg, filepath, inst)
+
+ @swbase.test_with_packages('stable#pkg3')
+ def test_check_blockdev(self):
+ """
+ Test ``GetInstance()`` call on ``LMI_SoftwareIdentityFileCheck`` with
+ block and character device files.
+ """
+ pkg = self.get_repo('stable')['pkg3']
+ filepath = '/usr/share/openlmi-sw-test-pkg3/devs/block56'
+ objpath = self.make_op(pkg, filepath)
+ inst = objpath.to_instance()
+ self.assertNotEqual(inst, None)
+ self.assertEqual(inst.FileType, FILE_TYPE_BLOCKDEV)
+ self.check_filepath(pkg, filepath)
+ self.do_check_dev(pkg, filepath, inst)
+
+ @swbase.test_with_packages('stable#pkg4')
+ def test_permissions(self):
+ """
+ Test ``FileMode`` property of ``LMI_SoftwareIdentityFileCheck``.
+ """
+ pkg = self.get_repo('stable')['pkg4']
+ permsdir = '/usr/share/openlmi-sw-test-pkg4/perms'
+ objpath = self.make_op(pkg, permsdir)
+ inst = objpath.to_instance()
+ self.assertNotEqual(inst, None)
+ permsdir += '/'
+ self.assertEqual(inst.FileMode, stat.S_IFDIR | stat.S_ISVTX | 0777)
+ self.assertEqual(set(inst.FileModeFlags), set(range(10)))
+
+ objpath.wrapped_object["Name"] = permsdir + 'rwxrwxrwx'
+ inst = objpath.to_instance()
+ self.assertEqual(inst.FileMode, stat.S_IFREG | 0777)
+ self.assertEqual(set(inst.FileModeFlags), set(range(9)))
+
+ objpath.wrapped_object["Name"] = permsdir + 'rw-rw-rw-'
+ inst = objpath.to_instance()
+ self.assertEqual(inst.FileMode, stat.S_IFREG | 0666)
+ self.assertEqual(set(inst.FileModeFlags), set([
+ FILE_MODE_ROTH, FILE_MODE_RGRP, FILE_MODE_RUSR,
+ FILE_MODE_WOTH, FILE_MODE_WGRP, FILE_MODE_WUSR]))
+
+ objpath.wrapped_object["Name"] = permsdir + 'r--r--r--'
+ inst = objpath.to_instance()
+ self.assertEqual(inst.FileMode, stat.S_IFREG | 0444)
+ self.assertEqual(set(inst.FileModeFlags), set([
+ FILE_MODE_ROTH, FILE_MODE_RGRP, FILE_MODE_RUSR]))
+
+ objpath.wrapped_object["Name"] = permsdir + 'r-xr-xr-x'
+ inst = objpath.to_instance()
+ self.assertEqual(inst.FileMode, stat.S_IFREG | 0555)
+ self.assertEqual(set(inst.FileModeFlags), set([
+ FILE_MODE_ROTH, FILE_MODE_RGRP, FILE_MODE_RUSR,
+ FILE_MODE_XOTH, FILE_MODE_XGRP, FILE_MODE_XUSR]))
+
+ objpath.wrapped_object["Name"] = permsdir + 'r---w---x'
+ inst = objpath.to_instance()
+ self.assertEqual(inst.FileMode, stat.S_IFREG | 0421)
+ self.assertEqual(set(inst.FileModeFlags), set([
+ FILE_MODE_RUSR, FILE_MODE_WGRP, FILE_MODE_XOTH]))
+
+ objpath.wrapped_object["Name"] = permsdir + 'rwSr--r--'
+ inst = objpath.to_instance()
+ self.assertEqual(inst.FileMode, stat.S_IFREG | 04644)
+ self.assertEqual(set(inst.FileModeFlags), set([
+ FILE_MODE_RUSR, FILE_MODE_WUSR,
+ FILE_MODE_RGRP, FILE_MODE_ROTH, FILE_MODE_SUID]))
+
+ objpath.wrapped_object["Name"] = permsdir + 'rw-r-Sr--'
+ inst = objpath.to_instance()
+ self.assertEqual(inst.FileMode, stat.S_IFREG | 02644)
+ self.assertEqual(set(inst.FileModeFlags), set([
+ FILE_MODE_RUSR, FILE_MODE_WUSR,
+ FILE_MODE_RGRP, FILE_MODE_ROTH, FILE_MODE_SGID]))
+
+ objpath.wrapped_object["Name"] = permsdir + 'rw-r--r-T'
+ inst = objpath.to_instance()
+ self.assertEqual(inst.FileMode, stat.S_IFREG | 01644)
+ self.assertEqual(set(inst.FileModeFlags), set([
+ FILE_MODE_RUSR, FILE_MODE_WUSR,
+ FILE_MODE_RGRP, FILE_MODE_ROTH, FILE_MODE_SVTX]))
+
+ objpath.wrapped_object["Name"] = permsdir + 'rwsr-sr-t'
+ inst = objpath.to_instance()
+ self.assertEqual(inst.FileMode, stat.S_IFREG | 07755)
+ self.assertEqual(set(inst.FileModeFlags), set([
+ FILE_MODE_ROTH, FILE_MODE_RGRP, FILE_MODE_RUSR,
+ FILE_MODE_XOTH, FILE_MODE_XGRP, FILE_MODE_XUSR,
+ FILE_MODE_WUSR, FILE_MODE_SUID, FILE_MODE_SGID, FILE_MODE_SVTX]))
@enable_lmi_exceptions
def test_enum_instance_names(self):
"""
- Tests EnumInstanceNames - which should not be supported.
+ Test ``EnumInstanceNames`` call on ``LMI_SoftwareIdentityFileCheck``
+ that should not be supported.
"""
self.assertRaisesCIM(pywbem.CIM_ERR_NOT_SUPPORTED,
self.cim_class.instance_names)
- @enable_lmi_exceptions
- @unittest.skip("not reliable test for random packages")
- def test_invoke_method_safe(self):
- """
- Test Invoke method in a safe manner.
- """
- for pkg in self.safe_pkgs:
- files = self.pkgdb_files[pkg.name]
- for filepath in files:
- objpath = self.make_op(pkg, filepath)
- inst = objpath.to_instance()
-
- (rval, _, _) = inst.Invoke()
- self.assertEqual(pywbem.Uint32(0), #Satisfied
- rval,
- msg="Invoke method should be successful for %s:%s"%(
- pkg.name, filepath))
-
- (rval, _, _) = inst.InvokeOnSystem(
- TargetSystem=self.system_iname)
-
- self.assertEqual(pywbem.Uint32(0), #Satisfied
- rval, msg="InvokeOnSystem method should be successful"
- " for %s:%s"%(pkg.name, filepath))
-
- system_iname = self.system_iname.wrapped_object
- system_iname["CreationClassName"] = "Bad class name"
- system_iname["Name"] = "some random name"
- self.assertRaisesCIM(pywbem.CIM_ERR_NOT_FOUND,
- inst.InvokeOnSystem, TargetSystem=system_iname)
-
- @mark_dangerous
- def test_invoke_method(self):
- """
- Tests Invoke method invocation.
- """
- for pkg in self.dangerous_pkgs:
- if ( rpmcache.is_pkg_installed(pkg.name)
- and not util.verify_pkg(pkg.name)):
- rpmcache.remove_pkg(pkg.name)
- if not rpmcache.is_pkg_installed(pkg.name):
- rpmcache.install_pkg(pkg)
- for filepath in self.pkgdb_files[pkg.name]:
- objpath = self.make_op(pkg, filepath)
- inst = objpath.to_instance()
-
- (rval, _) = inst.Invoke()
- self.assertEqual(pywbem.Uint32(0), #Satisfied
- rval, "Invoke method should be successful for %s:%s"
- % (pkg.name, filepath))
-
- # modify file
- if os.path.isfile(filepath):
- os.remove(filepath)
- else:
- os.lchown(filepath, os.stat(filepath).st_uid + 1, -1)
- (rval, _) = inst.Invoke()
- self.assertEqual(pywbem.Uint32(2), #Not Satisfied
- rval,
- "Invoke method should not pass for modified file %s:%s"
- % (pkg.name, filepath))
+ @swbase.test_with_packages('stable#pkg1')
+ def test_method_invoke_on_file(self):
+ """
+ Test ``Invoke`` method of ``LMI_SoftwareIdentityFileCheck`` on regular
+ file.
+ """
+ pkg = self.get_repo('stable')['pkg1']
+ fp = '/usr/share/openlmi-sw-test-pkg1/README'
+ objpath = self.make_op(pkg, fp)
+ inst = objpath.to_instance()
+ (rval, _, _) = inst.Invoke()
+ self.assertEqual(rval, INVOKE_SATISFIED,
+ "Invoke method should be successful for %s:%s" % (pkg, fp))
+ (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname)
+ self.assertEqual(rval, INVOKE_SATISFIED,
+ "InvokeOnSystem method should be successful for %s:%s"
+ % (pkg.name, fp))
+
+ stats = os.stat(fp)
+ backup = fp + '.bak'
+ shutil.copy(fp, backup)
+
+ # change mode
+ os.chmod(fp, 0600)
+ (rval, _, _) = inst.Invoke()
+ self.assertEqual(rval, INVOKE_NOT_SATISFIED)
+ (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname)
+ self.assertEqual(rval, INVOKE_NOT_SATISFIED)
+ inst.refresh()
+ self.assertEqual(set(inst.FailedFlags), set([FAILED_FLAGS_MODE]))
+
+ # restore it
+ shutil.copy(backup, fp)
+ os.utime(fp, (stats.st_atime, stats.st_mtime))
+ (rval, _, _) = inst.Invoke()
+ self.assertEqual(rval, INVOKE_SATISFIED)
+ (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname)
+ self.assertEqual(rval, INVOKE_SATISFIED)
+
+ # modify contents
+ with open(fp, 'a') as fobj:
+ fobj.write('new line\n')
+ os.utime(fp, (stats.st_atime, stats.st_mtime))
+ (rval, _, _) = inst.Invoke()
+ self.assertEqual(rval, INVOKE_NOT_SATISFIED)
+ (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname)
+ self.assertEqual(rval, INVOKE_NOT_SATISFIED)
+ inst.refresh()
+ self.assertEqual(set(inst.FailedFlags),
+ set([FAILED_FLAGS_SIZE, FAILED_FLAGS_CHECKSUM]))
+
+ # restore it
+ shutil.copy(backup, fp)
+ os.utime(fp, (stats.st_atime, stats.st_mtime))
+ (rval, _, _) = inst.Invoke()
+ self.assertEqual(rval, INVOKE_SATISFIED)
+ (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname)
+ self.assertEqual(rval, INVOKE_SATISFIED)
+
+ # touch it
+ os.utime(fp, None)
+ (rval, _, _) = inst.Invoke()
+ self.assertEqual(rval, INVOKE_NOT_SATISFIED)
+ (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname)
+ self.assertEqual(rval, INVOKE_NOT_SATISFIED)
+ inst.refresh()
+ self.assertEqual(set(inst.FailedFlags), set([FAILED_FLAGS_MTIME]))
+
+ @swbase.test_with_packages('stable#pkg2')
+ def test_method_invoke_on_symlink(self):
+ """
+ Test ``Invoke`` method of ``LMI_SoftwareIdentityFileCheck`` on symbolic
+ link.
+ """
+ pkg = self.get_repo('stable')['pkg2']
+ fp = '/usr/share/openlmi-sw-test-pkg2/symlinks/relative'
+ objpath = self.make_op(pkg, fp)
+ inst = objpath.to_instance()
+ (rval, _, _) = inst.Invoke()
+ self.assertEqual(rval, INVOKE_SATISFIED,
+ "Invoke method should be successful for %s:%s" % (pkg, fp))
+ (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname)
+ self.assertEqual(rval, INVOKE_SATISFIED,
+ "InvokeOnSystem method should be successful for %s:%s"
+ % (pkg.name, fp))
+ stats = os.stat(fp)
+ backup = fp + '.bak'
+ shutil.copy(fp, backup)
+
+ # touch it (mtime)
+ os.utime(fp, None)
+ (rval, _, _) = inst.Invoke()
+ self.assertEqual(rval, INVOKE_SATISFIED)
+ (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname)
+ self.assertEqual(rval, INVOKE_SATISFIED)
+
+ # change owner
+ os.lchown(fp, 1, -1)
+ (rval, _, _) = inst.Invoke()
+ self.assertEqual(rval, INVOKE_NOT_SATISFIED)
+ (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname)
+ self.assertEqual(rval, INVOKE_NOT_SATISFIED)
+ inst.refresh()
+ self.assertEqual(set(inst.FailedFlags), set([FAILED_FLAGS_UID]))
+
+ # remove symlink
+ os.remove(fp)
+ (rval, _, _) = inst.Invoke()
+ self.assertEqual(rval, INVOKE_NOT_SATISFIED)
+ (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname)
+ self.assertEqual(rval, INVOKE_NOT_SATISFIED)
+ inst.refresh()
+ self.assertEqual(set(inst.FailedFlags), set([FAILED_FLAGS_EXISTENCE]))
+
+ # restore it (mtime)
+ os.symlink("../data/target.txt", fp)
+ os.utime(fp, (stats.st_atime, stats.st_mtime))
+ (rval, _, _) = inst.Invoke()
+ self.assertEqual(rval, INVOKE_SATISFIED)
+ (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname)
+ self.assertEqual(rval, INVOKE_SATISFIED)
+
+ # point to different target
+ os.remove(fp)
+ os.symlink("../data/target.xtx", fp)
+ (rval, _, _) = inst.Invoke()
+ self.assertEqual(rval, INVOKE_NOT_SATISFIED)
+ (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname)
+ self.assertEqual(rval, INVOKE_NOT_SATISFIED)
+ inst.refresh()
+ self.assertEqual(set(inst.FailedFlags), set([FAILED_FLAGS_TARGET]))
+
+ @swbase.test_with_packages('stable#pkg3')
+ def test_method_invoke_on_fifo(self):
+ """
+ Test ``Invoke`` method of ``LMI_SoftwareIdentityFileCheck`` on fifo.
+ """
+ pkg = self.get_repo('stable')['pkg3']
+ fp = '/usr/share/openlmi-sw-test-pkg3/fifos/pipe'
+ objpath = self.make_op(pkg, fp)
+ inst = objpath.to_instance()
+ self.assertEqual(inst.FileType, FILE_TYPE_FIFO)
+ (rval, _, _) = inst.Invoke()
+ self.assertEqual(rval, INVOKE_SATISFIED,
+ "Invoke method should be successful for %s:%s" % (pkg, fp))
+ (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname)
+ self.assertEqual(rval, INVOKE_SATISFIED,
+ "InvokeOnSystem method should be successful for %s:%s"
+ % (pkg.name, fp))
+ stats = os.stat(fp)
+
+ # touch it (mtime)
+ os.utime(fp, None)
+ (rval, _, _) = inst.Invoke()
+ self.assertEqual(rval, INVOKE_SATISFIED)
+ (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname)
+ self.assertEqual(rval, INVOKE_SATISFIED)
+
+ # change owner
+ os.chown(fp, 1, -1)
+ (rval, _, _) = inst.Invoke()
+ self.assertEqual(rval, INVOKE_NOT_SATISFIED)
+ (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname)
+ self.assertEqual(rval, INVOKE_NOT_SATISFIED)
+ inst.refresh()
+ self.assertEqual(set(inst.FailedFlags), set([FAILED_FLAGS_UID]))
+
+ # remove fifo
+ os.remove(fp)
+ (rval, _, _) = inst.Invoke()
+ self.assertEqual(rval, INVOKE_NOT_SATISFIED)
+ (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname)
+ self.assertEqual(rval, INVOKE_NOT_SATISFIED)
+ inst.refresh()
+ self.assertEqual(set(inst.FailedFlags), set([FAILED_FLAGS_EXISTENCE]))
+
+ # restore it
+ os.mkfifo(fp, 0644)
+ os.utime(fp, (stats.st_atime, stats.st_mtime))
+ (rval, _, _) = inst.Invoke()
+ self.assertEqual(rval, INVOKE_SATISFIED)
+ (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname)
+ self.assertEqual(rval, INVOKE_SATISFIED)
+
+ # change mode
+ os.chmod(fp, 0400)
+ (rval, _, _) = inst.Invoke()
+ self.assertEqual(rval, INVOKE_NOT_SATISFIED)
+ (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname)
+ self.assertEqual(rval, INVOKE_NOT_SATISFIED)
+ inst.refresh()
+ self.assertEqual(set(inst.FailedFlags), set([FAILED_FLAGS_MODE]))
+
+ # replace it with regular file
+ os.remove(fp)
+ with open(fp, 'w') as fobj:
+ fobj.write('dummy file')
+ os.utime(fp, (stats.st_atime, stats.st_mtime))
+ inst = objpath.to_instance()
+ self.assertNotEqual(inst, None)
+ (rval, _, _) = inst.Invoke()
+ self.assertEqual(rval, INVOKE_NOT_SATISFIED)
+ (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname)
+ self.assertEqual(rval, INVOKE_NOT_SATISFIED)
+ inst.refresh()
+ self.assertEqual(inst.FileType, FILE_TYPE_FILE)
+ self.assertEqual(set(inst.FailedFlags), set([FAILED_FLAGS_MODE]))
+
+ @swbase.test_with_packages('stable#pkg4')
+ def test_method_invoke_on_directory(self):
+ """
+ Test ``Invoke`` method of ``LMI_SoftwareIdentityFileCheck``
+ on directory.
+ """
+ pkg = self.get_repo('stable')['pkg4']
+ fp = '/usr/share/openlmi-sw-test-pkg4/perms'
+ objpath = self.make_op(pkg, fp)
+ inst = objpath.to_instance()
+ self.assertEqual(inst.FileType, FILE_TYPE_DIRECTORY)
+ (rval, _, _) = inst.Invoke()
+ self.assertEqual(rval, INVOKE_SATISFIED,
+ "Invoke method should be successful for %s:%s" % (pkg, fp))
+ (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname)
+ self.assertEqual(rval, INVOKE_SATISFIED,
+ "InvokeOnSystem method should be successful for %s:%s"
+ % (pkg.name, fp))
+
+ # touch it (mtime)
+ os.utime(fp, None)
+ (rval, _, _) = inst.Invoke()
+ self.assertEqual(rval, INVOKE_SATISFIED)
+ (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname)
+ self.assertEqual(rval, INVOKE_SATISFIED)
+
+ # remove child
+ os.remove(os.path.join(fp, 'rwxrwxrwx'))
+ os.utime(fp, None)
+ (rval, _, _) = inst.Invoke()
+ self.assertEqual(rval, INVOKE_SATISFIED)
+ (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname)
+ self.assertEqual(rval, INVOKE_SATISFIED)
+
+ # chmod
+ os.chmod(fp, 0755)
+ (rval, _, _) = inst.Invoke()
+ self.assertEqual(rval, INVOKE_NOT_SATISFIED)
+ (rval, _, _) = inst.InvokeOnSystem(TargetSystem=self.system_iname)
+ self.assertEqual(rval, INVOKE_NOT_SATISFIED)
+ inst.refresh()
+ self.assertEqual(set(inst.FailedFlags), set([FAILED_FLAGS_MODE]))
def suite():
"""For unittest loaders."""
diff --git a/src/software/test/test_software_identity_resource.py b/src/software/test/test_software_identity_resource.py
index e6b5f66..8c18f62 100644
--- a/src/software/test/test_software_identity_resource.py
+++ b/src/software/test/test_software_identity_resource.py
@@ -19,42 +19,46 @@
# Authors: Michal Minar <miminar@redhat.com>
#
"""
-Unit tests for LMI_SoftwareIdentityResource provider.
+Unit tests for ``LMI_SoftwareIdentityResource`` provider.
"""
import pywbem
import time
import unittest
-from lmi.software.core.IdentityResource import Values
-from lmi.test.lmibase import enable_lmi_exceptions
-from lmi.test.util import mark_dangerous
-import base
+import swbase
import repository
-import util
-class TestSoftwareIdentityResource(
- base.SoftwareBaseTestCase): #pylint: disable=R0904
+ENABLED_DEFAULT_NOT_APPLICABLE = 5
+ENABLED_STATE_DISABLED = 3
+ENABLED_STATE_ENABLED = 2
+EXTENDED_RESOURCE_TYPE_RPM = 3
+HEALTH_STATE_MAJOR_FAILURE = 20
+HEALTH_STATE_OK = 5
+INFO_FORMAT_URL = 200
+OPERATIONAL_STATUS_ERROR = 6
+OPERATIONAL_STATUS_OK = 2
+REQUESTED_STATE_ENABLED = 2
+REQUESTED_STATE_DISABLED = 3
+REQUEST_STATE_CHANGE_SUCCESSFUL = 0
+RESOURCE_TYPE_OTHER = 1
+PRIMARY_STATUS_ERROR = 3
+PRIMARY_STATUS_OK = 1
+TRANSITIONING_TO_STATE_NOT_APPLICABLE = 12
+
+class TestSoftwareIdentityResource(swbase.SwTestCase):
"""
- Basic cim operations test.
+ Basic cim operations test on ``LMI_SoftwareIdentityResource``.
"""
CLASS_NAME = "LMI_SoftwareIdentityResource"
KEYS = ("CreationClassName", "Name", "SystemCreationClassName",
"SystemName")
- @classmethod
- def needs_pkgdb(cls):
- return False
-
- @classmethod
- def needs_repodb(cls):
- return True
-
def make_op(self, repo):
"""
- @param ses SoftwareElementState property value
- @return object path of SoftwareIdentityResource
+ :returns: Object path of ``LMI_SoftwareIdentityResource``
+ :rtype: :py:class:`lmi.shell.LMIInstanceName`
"""
if isinstance(repo, repository.Repository):
repo = repo.repoid
@@ -65,11 +69,6 @@ class TestSoftwareIdentityResource(
"CreationClassName" : self.CLASS_NAME
})
- def tearDown(self):
- for repo in self.repodb:
- if repository.is_repo_enabled(repo) != repo.status:
- repository.set_repo_enabled(repo, repo.status)
-
def _check_repo_instance(self, repo, inst):
"""
Checks instance properties of repository.
@@ -78,9 +77,9 @@ class TestSoftwareIdentityResource(
self.assertCIMNameEqual(objpath, inst.path)
for key in self.KEYS:
self.assertEqual(getattr(inst, key), getattr(inst.path, key))
- self.assertEqual(repo.repoid, inst.Name)
- self.assertIsInstance(inst.AccessContext, pywbem.Uint16)
+ self.assertEqual(inst.Name, repo.repoid)
+ self.assertIsInstance(inst.AccessContext, pywbem.Uint16)
access_info = inst.AccessInfo
if access_info is not None:
access_info = access_info.rstrip('/')
@@ -99,157 +98,293 @@ class TestSoftwareIdentityResource(
"AccessInfo missing in base_urls for repo %s" % repo.repoid)
self.assertIsInstance(inst.AvailableRequestedStates, list)
- self.assertEqual(repo.name, inst.Caption)
+ self.assertEqual(inst.Caption, repo.name)
self.assertIsInstance(inst.Cost, pywbem.Sint32)
self.assertIsInstance(inst.Description, basestring)
- self.assertEqual(repo.repoid, inst.ElementName)
- self.assertEqual(5, inst.EnabledDefault)
- self.assertEqual(2 if repo.status else 3, inst.EnabledState,
+ self.assertEqual(inst.ElementName, repo.repoid)
+ self.assertEqual(inst.EnabledDefault, ENABLED_DEFAULT_NOT_APPLICABLE)
+ self.assertEqual(
+ ENABLED_STATE_ENABLED
+ if repo.status else ENABLED_STATE_DISABLED,
+ inst.EnabledState,
"EnabledState does not match for repo %s" % repo.repoid)
- self.assertEqual(3, inst.ExtendedResourceType)
- if repo.revision is None:
- self.assertIsNone(inst.Generation)
- else:
- self.assertTrue(isinstance(inst.Generation, (int, long)))
+ self.assertEqual(inst.ExtendedResourceType, EXTENDED_RESOURCE_TYPE_RPM)
+ if repo.revision is not None:
+ self.assertTrue(isinstance(inst.Generation, (int, long)),
+ 'Generation must be integer for repo %s' % repo.repoid)
if repo.status:
- self.assertEqual(5, inst.HealthState) # OK
- self.assertEqual([2], inst.OperationalStatus) # OK
- self.assertEqual(1, inst.PrimaryStatus) # OK
+ self.assertEqual(inst.HealthState, HEALTH_STATE_OK)
+ self.assertEqual(inst.OperationalStatus, [OPERATIONAL_STATUS_OK])
+ self.assertEqual(inst.PrimaryStatus, PRIMARY_STATUS_OK)
else:
# repo may not be enabled, but still it can be ready
# (which is not documented), so let's check both posibilities
- self.assertIn(inst.HealthState, [5, 20]) # OK, Major Failure
- self.assertIn(inst.OperationalStatus, [[2], [6]]) # [OK], [Error]
- self.assertIn(inst.PrimaryStatus, [1, 3]) # OK, Error
+ # TODO: health state should be OK for disabled repository
+ self.assertIn(inst.HealthState,
+ [HEALTH_STATE_OK, HEALTH_STATE_MAJOR_FAILURE])
+ self.assertIn(inst.OperationalStatus,
+ [[OPERATIONAL_STATUS_OK], [OPERATIONAL_STATUS_ERROR]])
+ self.assertIn(inst.PrimaryStatus, [PRIMARY_STATUS_OK, PRIMARY_STATUS_ERROR])
self.assertIsInstance(inst.GPGCheck, bool)
- self.assertEqual(200, inst.InfoFormat)
+ self.assertEqual(inst.InfoFormat, INFO_FORMAT_URL)
self.assertEqual("LMI:LMI_SoftwareIdentityResource:"+repo.repoid,
inst.InstanceID)
- if repo.mirror_list is None:
- self.assertIsNone(inst.MirrorList)
- else:
- self.assertEqual(
- repo.metalink if repo.metalink else repo.mirror_list,
- inst.MirrorList)
+ self.assertEqual(inst.MirrorList, repo.mirror_list,
+ 'MirrorList must match for repo %s' % repo.repoid)
self.assertIsInstance(inst.OtherAccessContext, basestring)
- #self.assertEqual(repo.pkg_count, inst.PackageCount,
- #"PackageCount does not match for repo %s" % repo.repoid)
self.assertIsInstance(inst.RepoGPGCheck, bool)
- self.assertEqual(2 if repo.status else 3, inst.RequestedState)
- self.assertEqual(1, inst.ResourceType)
+ self.assertEqual(
+ ENABLED_STATE_ENABLED
+ if repo.status else ENABLED_STATE_DISABLED,
+ inst.RequestedState)
+ self.assertEqual(inst.ResourceType, RESOURCE_TYPE_OTHER)
self.assertIsInstance(inst.StatusDescriptions, list)
self.assertEqual(1, len(inst.StatusDescriptions))
- if repo.last_updated is None:
- self.assertIsNone(inst.TimeOfLastUpdate)
- else:
+ if repo.last_updated is not None:
time_stamp = repo.last_updated.replace(
microsecond=0, tzinfo=pywbem.cim_types.MinutesFromUTC(0))
self.assertGreaterEqual(inst.TimeOfLastUpdate.datetime, time_stamp)
- self.assertEqual(12, inst.TransitioningToState)
+ self.assertEqual(TRANSITIONING_TO_STATE_NOT_APPLICABLE,
+ inst.TransitioningToState)
- def test_get_instance_safe(self):
- """
- Tests GetInstance call.
- """
- for repo in self.repodb:
- objpath = self.make_op(repo)
- inst = objpath.to_instance()
- self._check_repo_instance(repo, inst)
-
- @mark_dangerous
+ @swbase.test_with_repos(stable=True, updates=False)
def test_get_instance(self):
"""
- Dangerous test, making sure, that properties are set correctly
- for enabled and disabled repositories.
+ Test ``GetInstance()`` call on ``LMI_SoftwareIdentityResource``.
"""
- for repo in self.repodb:
- objpath = self.make_op(repo)
- self.assertIs(repository.is_repo_enabled(repo), repo.status)
- inst = objpath.to_instance()
- self.assertCIMNameEqual(objpath, inst.path)
- for key in self.KEYS:
- self.assertEqual(getattr(inst, key), getattr(inst.path, key))
- self.assertEqual(repo.repoid, inst.Name)
- self.assertEqual(2 if repo.status else 3, inst.EnabledState)
- self.assertEqual(2 if repo.status else 3, inst.RequestedState)
- for repo in self.repodb:
- objpath = self.make_op(repo)
- time.sleep(1) # to make sure, that change will be noticed
- repository.set_repo_enabled(repo, not repo.status)
- self.assertIs(repository.is_repo_enabled(repo), not repo.status)
- inst = objpath.to_instance()
- self.assertEqual(repo.repoid, inst.Name)
- self.assertEqual(3 if repo.status else 2, inst.EnabledState)
- self.assertEqual(3 if repo.status else 2, inst.RequestedState)
+ stablerepo = self.get_repo('stable')
+ self.assertTrue(stablerepo.status, "stable repository is enabled")
+ objpath = self.make_op(stablerepo)
+ inst = objpath.to_instance()
+ self.assertNotEqual(inst, None,
+ "GetInstance() call on stable repository is successful"
+ ' with path: "%s"' % str(objpath))
+ self._check_repo_instance(stablerepo, inst)
+ self.assertEqual(set(self.KEYS), set(inst.path.key_properties()))
+
+ updaterepo = self.get_repo('updates')
+ objpath = self.make_op(updaterepo)
+ self.assertFalse(updaterepo.status, "updates repository is disabled")
+ inst = objpath.to_instance()
+ self.assertNotEqual(inst, None,
+ 'GetInstance() call on updates repository is successful'
+ ' with path "%s"' % str(objpath))
+ self._check_repo_instance(updaterepo, inst)
+ self.assertEqual(set(self.KEYS), set(inst.path.key_properties()))
def test_enum_instance_names(self):
"""
- Tests EnumInstanceNames call on all repositories.
+ Test ``EnumInstanceNames()`` call on ``LMI_SoftwareIdentityResource``.
"""
+ all_repo_names = set(self.repodb.keys()).union(
+ set(self.other_repos.keys()))
inames = self.cim_class.instance_names()
- self.assertEqual(len(inames), len(self.repodb))
- repoids = set(r.repoid for r in self.repodb)
for iname in inames:
- self.assertEqual(iname.namespace, 'root/cimv2')
self.assertEqual(set(iname.key_properties()), set(self.KEYS))
- self.assertIn(iname.Name, repoids)
- objpath = self.make_op(iname.Name)
- self.assertCIMNameEqual(objpath, iname)
+ self.assertIn(iname.Name, all_repo_names)
+ all_repo_names.remove(iname.Name)
+ self.assertEqual(iname.SystemCreationClassName, self.system_cs_name)
+ self.assertEqual(iname.CreationClassName, self.CLASS_NAME)
+ self.assertEqual(len(all_repo_names), 0)
def test_enum_instances(self):
"""
- Tests EnumInstances call on all repositories.
+ Test ``EnumInstances()`` call on ``LMI_SoftwareIdentityResource``.
"""
+ all_repo_names = set(self.repodb.keys()).union(
+ set(self.other_repos.keys()))
insts = self.cim_class.instances()
- self.assertGreater(len(insts), 0)
- repoids = dict((r.repoid, r) for r in self.repodb)
for inst in insts:
- self.assertIn(inst.Name, repoids)
- self._check_repo_instance(repoids[inst.Name], inst)
+ self.assertEqual(set(inst.path.key_properties()), set(self.KEYS))
+ self.assertIn(inst.Name, all_repo_names)
+ all_repo_names.remove(inst.Name)
+ if inst.Name in self.repodb:
+ repo = self.repodb[inst.Name]
+ else:
+ repo = self.other_repos[inst.Name]
+ self._check_repo_instance(repo, inst)
+ self.assertEqual(len(all_repo_names), 0)
- @enable_lmi_exceptions
- @mark_dangerous
- def test_request_state_change(self):
+ @swbase.test_with_repos('updates')
+ def test_disable_repo(self):
"""
- Tests InvokeMethod on RequestStateChange().
+ Test whether ``LMI_SoftwareIdentityResource`` provider recognizes
+ that repository has been disabled.
"""
- for repo in self.repodb:
- objpath = self.make_op(repo)
- self.assertIs(repository.is_repo_enabled(repo), repo.status)
+ repo = self.get_repo('updates')
+ self.assertTrue(repo.status)
+ objpath = self.make_op(repo)
+ inst = objpath.to_instance()
+ self.assertNotEqual(inst, None,
+ 'GetInstance() call on updates repository is successful'
+ ' with path "%s"' % str(objpath))
+ self.assertEqual(inst.EnabledState, ENABLED_STATE_ENABLED)
+ self.assertEqual(inst.HealthState, HEALTH_STATE_OK)
+ self.assertEqual(inst.OperationalStatus, [OPERATIONAL_STATUS_OK])
+ self.assertEqual(inst.PrimaryStatus, PRIMARY_STATUS_OK)
- # change state of repository (enabled/disabled)
- req_state = ( Values.RequestStateChange.RequestedState.Disabled
- if repo.status else
- Values.RequestStateChange.RequestedState.Enabled)
- inst = objpath.to_instance()
- (res, oparms, _) = inst.RequestStateChange(RequestedState=req_state)
- self.assertEqual(0, res) #Completed with no error
- self.assertEqual(0, len(oparms))
- inst.refresh()
- self.assertEqual(inst.EnabledState, req_state)
- for pkg in self.dangerous_pkgs:
- if pkg.up_repo == repo:
- pkg_iname = util.make_identity_path(pkg)
- self.assertRaisesCIM(pywbem.CIM_ERR_NOT_FOUND,
- objpath.to_instance)
+ repository.set_repo_enabled(repo, False)
+ repo.refresh()
+ self.assertFalse(repo.status)
+ time.sleep(0.1)
+ inst.refresh()
+ self.assertNotEqual(inst, None,
+ 'GetInstance() call on updates repository is successful'
+ ' with path "%s"' % str(objpath))
+ self.assertEqual(inst.EnabledState, ENABLED_STATE_DISABLED)
+ self.assertEqual(inst.HealthState, HEALTH_STATE_OK)
+ self.assertEqual(inst.OperationalStatus, [OPERATIONAL_STATUS_OK])
+ self.assertEqual(inst.PrimaryStatus, PRIMARY_STATUS_OK)
+
+
+ @swbase.test_with_repos(updates=False)
+ def test_enable_repo(self):
+ """
+ Test whether ``LMI_SoftwareIdentityResource`` provider recognizes that
+ repository has been enabled.
+ """
+ repo = self.get_repo('updates')
+ self.assertFalse(repo.status)
+ objpath = self.make_op(repo)
+ inst = objpath.to_instance()
+ self.assertNotEqual(inst, None,
+ 'GetInstance() call on updates repository is successful'
+ ' with path "%s"' % str(objpath))
+ self.assertEqual(inst.EnabledState, ENABLED_STATE_DISABLED)
+ self.assertEqual(inst.HealthState, HEALTH_STATE_OK)
+ self.assertEqual(inst.OperationalStatus, [OPERATIONAL_STATUS_OK])
+ self.assertEqual(inst.PrimaryStatus, PRIMARY_STATUS_OK)
+
+ repository.set_repo_enabled(repo, True)
+ repo.refresh()
+ self.assertTrue(repo.status)
+ inst.refresh()
+ self.assertNotEqual(inst, None,
+ 'GetInstance() call on updates repository is successful'
+ ' with path "%s"' % str(objpath))
+ self.assertEqual(inst.EnabledState, ENABLED_STATE_ENABLED)
+ self.assertEqual(inst.HealthState, HEALTH_STATE_OK)
+ self.assertEqual(inst.OperationalStatus, [OPERATIONAL_STATUS_OK])
+ self.assertEqual(inst.PrimaryStatus, PRIMARY_STATUS_OK)
+
+ @swbase.test_with_repos('stable')
+ def test_disable_enabled_repo(self):
+ """
+ Test ``RequestStateChange()`` method invocation on
+ ``LMI_SoftwareIdentityResource`` by requesting disablement of
+ repository.
+ """
+ repo = self.get_repo('stable')
+ self.assertTrue(repo.status)
+ objpath = self.make_op(repo)
+ inst = objpath.to_instance()
+ self.assertNotEqual(inst, None,
+ 'GetInstance() call on stable repository is successful'
+ ' with path "%s"' % str(objpath))
+ self.assertEqual(inst.EnabledState, ENABLED_STATE_ENABLED)
+
+ # try to disable it with method invocation
+ (rval, oparms, _) = inst.RequestStateChange(
+ RequestedState=REQUESTED_STATE_DISABLED)
+ self.assertEqual(rval, REQUEST_STATE_CHANGE_SUCCESSFUL)
+ self.assertEqual(len(oparms), 0)
+ repo.refresh()
+ self.assertFalse(repo.status)
+ inst.refresh()
+ self.assertEqual(inst.EnabledState, ENABLED_STATE_DISABLED)
+ self.assertEqual(inst.HealthState, HEALTH_STATE_OK)
+ self.assertEqual(inst.OperationalStatus, [OPERATIONAL_STATUS_OK])
+ self.assertEqual(inst.PrimaryStatus, PRIMARY_STATUS_OK)
+
+ @swbase.test_with_repos(stable=False)
+ def test_enable_disabled_repo(self):
+ """
+ Test ``RequestStateChange()`` method invocation on
+ ``LMI_SoftwareIdentityResource`` by requesting enablement of
+ repository.
+ """
+ repo = self.get_repo('stable')
+ self.assertFalse(repo.status)
+ objpath = self.make_op(repo)
+ inst = objpath.to_instance()
+ self.assertNotEqual(inst, None,
+ 'GetInstance() call on stable repository is successful'
+ ' with path "%s"' % str(objpath))
+ self.assertEqual(inst.EnabledState, ENABLED_STATE_DISABLED)
+
+ # try to enable it with method invocation
+ (rval, oparms, _) = inst.RequestStateChange(
+ RequestedState=REQUESTED_STATE_ENABLED)
+ self.assertEqual(rval, REQUEST_STATE_CHANGE_SUCCESSFUL)
+ self.assertEqual(len(oparms), 0)
+ repo.refresh()
+ self.assertTrue(repo.status)
+ inst.refresh()
+ self.assertEqual(inst.EnabledState, ENABLED_STATE_ENABLED)
+ self.assertEqual(inst.HealthState, HEALTH_STATE_OK)
+ self.assertEqual(inst.OperationalStatus, [OPERATIONAL_STATUS_OK])
+ self.assertEqual(inst.PrimaryStatus, PRIMARY_STATUS_OK)
+
+ @swbase.test_with_repos('stable')
+ def test_enable_enabled_repo(self):
+ """
+ Test ``RequestStateChange()`` method invocation on
+ ``LMI_SoftwareIdentityResource`` by requesting enablement of
+ enabled repository.
+ """
+ repo = self.get_repo('stable')
+ self.assertTrue(repo.status)
+ objpath = self.make_op(repo)
+ inst = objpath.to_instance()
+ self.assertNotEqual(inst, None,
+ 'GetInstance() call on stable repository is successful'
+ ' with path "%s"' % str(objpath))
+ self.assertEqual(inst.EnabledState, ENABLED_STATE_ENABLED)
+
+ # try to enable it with method invocation
+ (rval, oparms, _) = inst.RequestStateChange(
+ RequestedState=REQUESTED_STATE_ENABLED)
+ self.assertEqual(rval, REQUEST_STATE_CHANGE_SUCCESSFUL)
+ self.assertEqual(len(oparms), 0)
+ repo.refresh()
+ self.assertTrue(repo.status)
+ inst.refresh()
+ self.assertEqual(inst.EnabledState, ENABLED_STATE_ENABLED)
+ self.assertEqual(inst.HealthState, HEALTH_STATE_OK)
+ self.assertEqual(inst.OperationalStatus, [OPERATIONAL_STATUS_OK])
+ self.assertEqual(inst.PrimaryStatus, PRIMARY_STATUS_OK)
+
+ @swbase.test_with_repos(stable=False)
+ def test_disable_disabled_repo(self):
+ """
+ Test ``RequestStateChange()`` method invocation on
+ ``LMI_SoftwareIdentityResource`` by requesting disablement of
+ disabled repository.
+ """
+ repo = self.get_repo('stable')
+ self.assertFalse(repo.status)
+ objpath = self.make_op(repo)
+ inst = objpath.to_instance()
+ self.assertNotEqual(inst, None,
+ 'GetInstance() call on stable repository is successful'
+ ' with path "%s"' % str(objpath))
+ self.assertEqual(inst.EnabledState, ENABLED_STATE_DISABLED)
- # revert back to original configuration
- req_state = ( Values.RequestStateChange.RequestedState.Enabled
- if repo.status else
- Values.RequestStateChange.RequestedState.Disabled)
- (res, oparms, _) = inst.RequestStateChange(RequestedState=req_state)
- self.assertEqual(0, res) #Completed with no error
- self.assertEqual(0, len(oparms))
- inst.refresh()
- self.assertEqual(inst.EnabledState, req_state)
- for pkg in self.dangerous_pkgs:
- if pkg.up_repo == repo:
- pkg_iname = util.make_identity_path(pkg)
- inst = pkg_iname.to_instance()
- self.assertNotEqual(inst, None)
+ # try to enable it with method invocation
+ (rval, oparms, _) = inst.RequestStateChange(
+ RequestedState=REQUESTED_STATE_DISABLED)
+ self.assertEqual(rval, REQUEST_STATE_CHANGE_SUCCESSFUL)
+ self.assertEqual(len(oparms), 0)
+ repo.refresh()
+ self.assertFalse(repo.status)
+ inst.refresh()
+ self.assertEqual(inst.EnabledState, ENABLED_STATE_DISABLED)
+ self.assertEqual(inst.HealthState, HEALTH_STATE_OK)
+ self.assertEqual(inst.OperationalStatus, [OPERATIONAL_STATUS_OK])
+ self.assertEqual(inst.PrimaryStatus, PRIMARY_STATUS_OK)
def suite():
- """For unittest loaders."""
+ """ For unittest loaders. """
return unittest.TestLoader().loadTestsFromTestCase(
TestSoftwareIdentityResource)
diff --git a/src/software/test/test_system_software_collection.py b/src/software/test/test_system_software_collection.py
index ae3236a..bcd184e 100644
--- a/src/software/test/test_system_software_collection.py
+++ b/src/software/test/test_system_software_collection.py
@@ -19,17 +19,16 @@
# Authors: Michal Minar <miminar@redhat.com>
#
"""
-Unit tests for LMI_SoftwareIdentity provider.
+Unit tests for ``LMI_SystemSoftwareCollection`` provider.
"""
import unittest
-import base
+import swbase
-class TestSystemSoftwareCollection(
- base.SoftwareBaseTestCase): #pylint: disable=R0904
+class TestSystemSoftwareCollection(swbase.SwTestCase):
"""
- Basic cim operations test.
+ Basic cim operations test on ``LMI_SystemSoftwareCollection``.
"""
CLASS_NAME = "LMI_SystemSoftwareCollection"
@@ -37,8 +36,8 @@ class TestSystemSoftwareCollection(
def make_op(self):
"""
- @param ses SoftwareElementState property value
- @return object path of SoftwareIdentity
+ :returns Object path of ``LMI_SystemSoftwareCollection``.
+ :rtype: :py:class:`lmi.shell.LMIInstanceName`
"""
return self.cim_class.new_instance_name({
"InstanceID" : "LMI:LMI_SystemSoftwareCollection"
@@ -46,7 +45,7 @@ class TestSystemSoftwareCollection(
def test_get_instance(self):
"""
- Tests GetInstance call on packages from our rpm cache.
+ Test ``GetInstance()`` call on ``LMI_SystemSoftwareCollection``.
"""
objpath = self.make_op()
inst = objpath.to_instance()
@@ -58,7 +57,7 @@ class TestSystemSoftwareCollection(
def test_enum_instance_names(self):
"""
- Tests EnumInstanceNames call on installed packages.
+ Test ``EnumInstanceNames()`` call on ``LMI_SystemSoftwareCollection``.
"""
inames = self.cim_class.instance_names()
self.assertEqual(len(inames), 1)
@@ -67,7 +66,7 @@ class TestSystemSoftwareCollection(
def test_enum_instances(self):
"""
- Tests EnumInstances call on installed packages.
+ Test ``EnumInstances()`` call on ``LMI_SystemSoftwareCollection``.
"""
insts = self.cim_class.instances()
self.assertEqual(len(insts), 1)
diff --git a/src/software/test/util.py b/src/software/test/util.py
index c0cb4f0..3c63be0 100644
--- a/src/software/test/util.py
+++ b/src/software/test/util.py
@@ -49,17 +49,17 @@ def make_nevra(name, epoch, ver, rel, arch, with_epoch='NOT_ZERO'):
"""
estr = ''
if with_epoch.lower() == "always":
- estr = epoch
+ estr = str(epoch)
elif with_epoch.lower() == "not_zero":
- if epoch and epoch.lower() not in {"0", "(none)"}:
- estr = epoch
+ if epoch and str(epoch).lower() not in {"0", "(none)"}:
+ estr = str(epoch)
if len(estr):
estr += ":"
return "%s-%s%s-%s.%s" % (name, estr, ver, rel, arch)
def make_evra(epoch, ver, rel, arch):
""" @return evra string """
- if not epoch or epoch.lower() == "(none)":
+ if not epoch or str(epoch).lower() == "(none)":
epoch = "0"
return "%s:%s-%s.%s" % (epoch, ver, rel, arch)