diff options
Diffstat (limited to 'src/software/test')
-rw-r--r-- | src/software/test/base.py (renamed from src/software/test/common.py) | 153 | ||||
-rw-r--r-- | src/software/test/package.py | 117 | ||||
-rw-r--r-- | src/software/test/rpmcache.py | 422 | ||||
-rwxr-xr-x | src/software/test/run.py | 23 | ||||
-rwxr-xr-x | src/software/test/test_hosted_software_collection.py | 155 | ||||
-rwxr-xr-x | src/software/test/test_member_of_software_collection.py | 155 | ||||
-rwxr-xr-x | src/software/test/test_software_file_check.py | 527 | ||||
-rwxr-xr-x | src/software/test/test_software_identity.py | 154 | ||||
-rwxr-xr-x | src/software/test/test_software_installed_package.py | 351 | ||||
-rwxr-xr-x | src/software/test/test_software_package.py | 153 | ||||
-rwxr-xr-x | src/software/test/test_system_software_collection.py | 86 | ||||
-rw-r--r-- | src/software/test/util.py | 82 |
12 files changed, 1070 insertions, 1308 deletions
diff --git a/src/software/test/common.py b/src/software/test/base.py index d91f236..4e56f46 100644 --- a/src/software/test/common.py +++ b/src/software/test/base.py @@ -21,61 +21,15 @@ Common utilities and base class for all software tests. """ +import itertools import os import pywbem -import re import tempfile import unittest -from subprocess import call, check_output +from subprocess import check_output import rpmcache -RE_NEVRA = re.compile(r'^(?P<name>.+)-(?P<evra>((?P<epoch>\d+):)?(?P<ver>[^-]+)' - r'-(?P<rel>.+)\.(?P<arch>[^.]+))$') - -def remove_pkg(pkg, *args): - """ - Remove package with rpm command. - @param pkg is either instance of Package or package name - @param args is a list of parameters for rpm command - """ - if isinstance(pkg, rpmcache.Package): - pkg = pkg.name - call(["rpm", "--quiet"] + list(args) + ["-e", pkg]) - -def install_pkg(pkg, newer=True, repolist=[]): - """ - Install a specific package. - @param pkg is either package name or instance of Package - In latter case, a specific version is installed. - @param repolist is a list of repositories, that should be - used for downloading, if using yum - when empty, all enabled repositories are used - """ - if isinstance(pkg, rpmcache.Package): - try: - rpm_name = rpmcache.get_rpm_name(pkg, newer=newer) - call(["rpm", "--quiet", "-i", rpm_name]) - return - except rpmcache.MissingRPM: - pass - pkg = pkg.name - rpmcache.run_yum('-q', '-y', 'install', pkg, repolist=repolist) - -def is_installed(pkg, newer=True): - """ - Check, whether package is installed. - Accepts the same parameters as install_pkg. - @see install_pkg - """ - return rpmcache.is_installed(pkg, newer) - -def verify_pkg(name): - """ - @return output of command rpm, with verification output for package - """ - return call(["rpm", "--quiet", "-Va", name]) == 0 - def mark_dangerous(method): """ Decorator for methods of unittest.TestCase subclasses, that @@ -86,21 +40,17 @@ def mark_dangerous(method): else: return unittest.skip("This test is marked as dangerous.")(method) -def is_config_file(pkg, file_path): - """ - @return True, if file_path is a configuration file of package pkg. - """ - cmd = ['rpm', '-qc', pkg.name] - out = check_output(cmd) - return file_path in set(out.splitlines()) #pylint: disable=E1103 - -def is_doc_file(pkg, file_path): +def mark_tedious(method): """ - @return True, if file_path is a documentation file of package pkg. + Decorator for methods of unittest.TestCase subclasses, that + skips tedious tests. Those running for very long time and usually + need a lot of memory. Environment variable "LMI_RUN_TEDIOUS" can + allow them """ - cmd = ['rpm', '-qd', pkg.name] - out = check_output(cmd) - return file_path in set(out.splitlines()) #pylint: disable=E1103 + if os.environ.get('LMI_RUN_TEDIOUS', '0') == '1': + return method + else: + return unittest.skip("This test is marked as tedious.")(method) def get_pkg_files(pkg): """ @@ -124,9 +74,9 @@ def get_pkg_files(pkg): elif len(symlinks) == 0 and os.path.islink(fpath): symlinks.add(fpath) elif not os.path.islink(fpath) and os.path.isfile(fpath): - if len(configs) == 0 and is_config_file(pkg, fpath): + if len(configs) == 0 and rpmcache.has_pkg_config_file(pkg, fpath): configs.add(fpath) - elif len(docs) == 0 and is_doc_file(pkg, fpath): + elif len(docs) == 0 and rpmcache.has_pkg_doc_file(pkg, fpath): docs.add(fpath) elif len(files) == 0: files.add(fpath) @@ -142,6 +92,27 @@ class SoftwareBaseTestCase(unittest.TestCase): #pylint: disable=R0904 CLASS_NAME = "Define in subclass" + # will be filled when first needed + # it's a dictionary with items (pkg_name, [file_path1, ...]) + PKGDB_FILES = None + + @classmethod + def get_pkgdb_files(cls): + """ + @return dictionary { pkg_name: ["file_path1, ...] } + """ + if cls.PKGDB_FILES is not None: + return cls.PKGDB_FILES + SoftwareBaseTestCase.PKGDB_FILES = res = dict( + (pkg.name, get_pkg_files(pkg)) for pkg in itertools.chain( + cls.safe_pkgs, cls.dangerous_pkgs)) + return res + + @classmethod + def needs_pkgdb_files(cls): + """subclass may override this, if it needs PKGDB_FILES database""" + return False + def __init__(self, *args, **kwargs): unittest.TestCase.__init__(self, *args, **kwargs) self.longMessage = True @@ -152,9 +123,20 @@ class SoftwareBaseTestCase(unittest.TestCase): #pylint: disable=R0904 namespace="root/cimv2", classname=self.CLASS_NAME) def install_pkg(self, pkg, newer=True, repolist=None): + """ + Use this method instead of function in rpmcache in tests. + """ + if repolist is None: + repolist = self.test_repos + return rpmcache.install_pkg(pkg, newer, repolist) + + def ensure_pkg_installed(self, pkg, newer=True, repolist=None): + """ + Use this method instead of function in rpmcache in tests. + """ if repolist is None: repolist = self.test_repos - return install_pkg(pkg, newer, repolist) + return rpmcache.ensure_pkg_installed(pkg, newer, repolist) def assertIsSubclass(self, cls, base_cls): #pylint: disable=C0103 """ @@ -170,19 +152,17 @@ class SoftwareBaseTestCase(unittest.TestCase): #pylint: disable=R0904 "root/cimv2", base_cls, cls)) def assertEqual(self, fst, snd, *args, **kwargs): + """ + Modify assertEqual for instance names comparing only important + properties. + """ if ( isinstance(fst, pywbem.CIMInstanceName) and isinstance(snd, pywbem.CIMInstanceName) - and fst.classname == "LMI_SoftwarePackage" and fst.classname == snd.classname and fst.namespace == snd.namespace - and fst.keys() == snd.keys() - and all(fst[k] == snd[k] for k in ("Name", "SoftwareElementID", - "SoftwareElementState", "Version")) - and isinstance(fst["TargetOperatingSystem"], (int, long)) - and isinstance(snd["TargetOperatingSystem"], (int, long))): + and fst.keybindings == snd.keybindings): return True - return unittest.TestCase.assertEqual( - self, fst, snd, *args, **kwargs) + unittest.TestCase.assertEqual(self, fst, snd, *args, **kwargs) @classmethod def setUpClass(cls): @@ -193,11 +173,8 @@ class SoftwareBaseTestCase(unittest.TestCase): #pylint: disable=R0904 cls.conn = pywbem.WBEMConnection(cls.url, (cls.user, cls.password)) cls.run_dangerous = ( os.environ.get('LMI_RUN_DANGEROUS', '0') == '1') - cls.test_repos = os.environ.get('LMI_SOFTWARE_TEST_REPOS', '') - if not cls.test_repos: - cls.test_repos = [] - else: - cls.test_repos = cls.test_repos.split(',') + cls.test_repos = os.environ.get( + 'LMI_SOFTWARE_TEST_REPOS', '').split(',') use_cache = os.environ.get('LMI_SOFTWARE_USE_CACHE', '0') == '1' cls.cache_dir = None if use_cache: @@ -210,19 +187,23 @@ class SoftwareBaseTestCase(unittest.TestCase): #pylint: disable=R0904 os.makedirs(cls.cache_dir) # rpm packages are expected to be in CWD os.chdir(cls.cache_dir) - cls.pkgdb = rpmcache.get_pkg_database(use_cache=use_cache) - for pkg in cls.pkgdb: - if not is_installed(pkg.name): - install_pkg(pkg, repolist=cls.test_repos) - cls.pkg_files = dict((pkg.name, get_pkg_files(pkg)) - for pkg in cls.pkgdb) + cls.safe_pkgs, cls.dangerous_pkgs = rpmcache.get_pkg_database( + use_cache=use_cache, + dangerous=cls.run_dangerous, + repolist=cls.test_repos, + cache_dir=cls.cache_dir if use_cache else None) + for pkg in cls.dangerous_pkgs: + if not rpmcache.is_pkg_installed(pkg.name): + rpmcache.install_pkg(pkg, repolist=cls.test_repos) + if cls.needs_pkgdb_files(): + cls.pkgdb_files = cls.get_pkgdb_files() @classmethod def tearDownClass(cls): if cls.run_dangerous: - for pkg in cls.pkgdb: - if is_installed(pkg.name): - remove_pkg(pkg.name) + for pkg in cls.dangerous_pkgs: + if rpmcache.is_pkg_installed(pkg.name): + rpmcache.remove_pkg(pkg.name) if hasattr(cls, "prev_dir"): os.chdir(cls.prev_dir) diff --git a/src/software/test/package.py b/src/software/test/package.py new file mode 100644 index 0000000..997ee34 --- /dev/null +++ b/src/software/test/package.py @@ -0,0 +1,117 @@ +#!/usr/bin/python +# -*- Coding:utf-8 -*- +# +# Copyright (C) 2012 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. # +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Michal Minar <miminar@redhat.com> +""" +Abstraction for RPM package for test purposes. +""" + +import util + +class Package(object): #pylint: disable=R0902 + """ + Element of test package database. It's a container for package + informations. It contains two sets of versions for single package. + That's meant for updating tests. + """ + def __init__(self, name, epoch, ver, rel, arch, repo, + **kwargs): + """ + Arguments prefixed with 'up_' are for newer package. + """ + self._name = name + if not epoch or epoch.lower() == "(none)": + epoch = "0" + self._epoch = epoch + self._ver = ver + self._rel = rel + self._arch = arch + self._repo = repo + safe = kwargs.get('safe', False) + self._up_epoch = epoch if safe else kwargs.get('up_epoch', epoch) + self._up_ver = ver if safe else kwargs.get('up_ver' , ver) + self._up_rel = rel if safe else kwargs.get('up_rel' , rel) + self._up_repo = repo if safe else kwargs.get('up_repo', repo) + + def __str__(self): + return self.get_nevra() + + @property + def name(self): return self._name #pylint: disable=C0111,C0321 + @property + def epoch(self): return self._epoch #pylint: disable=C0111,C0321 + @property + def ver(self): return self._ver #pylint: disable=C0111,C0321 + @property + def rel(self): return self._rel #pylint: disable=C0111,C0321 + @property + def arch(self): return self._arch #pylint: disable=C0111,C0321 + @property + def repo(self): return self._repo #pylint: disable=C0111,C0321 + @property + def nevra(self): #pylint: disable=C0111,C0321 + return self.get_nevra(False) + @property + def evra(self): #pylint: disable=C0111,C0321 + return self.get_evra(False) + + @property + def up_epoch(self): return self._up_epoch #pylint: disable=C0111,C0321 + @property + def up_ver(self): return self._up_ver #pylint: disable=C0111,C0321 + @property + def up_rel(self): return self._up_rel #pylint: disable=C0111,C0321 + @property + def up_repo(self): return self._up_repo #pylint: disable=C0111,C0321 + @property + def up_nevra(self): #pylint: disable=C0111,C0321 + return self.get_nevra(True) + @property + def up_evra(self): #pylint: disable=C0111,C0321 + return self.get_evra(True) + + @property + def is_safe(self): + """ + @return True if properties prefixed with up_ matches those without + it. In that case a package is suited for non-dangerous tests. + """ + return all( getattr(self, a) == getattr(self, 'up_' + a) + for a in ('epoch', 'ver', 'rel', 'repo')) + + def get_nevra(self, newer=True, with_epoch='NOT_ZERO'): + """ + @newer if True, evr part is made from properties prefixed with 'up_' + @return pkg nevra string + """ + if newer: + attrs = ['name', 'up_epoch', 'up_ver', 'up_rel', 'arch'] + else: + attrs = ['name', 'epoch', 'ver', 'rel', 'arch'] + return util.make_nevra(*[getattr(self, '_'+a) for a in attrs], + with_epoch=with_epoch) + + def get_evra(self, newer=True): + """ + @newer if True, evr part is made from properties prefixed with 'up_' + @return pkg nevra string + """ + attrs = [('up_' if newer else '')+a for a in ('epoch', 'ver', 'rel')] + attrs.append('arch') + return util.make_evra(*[getattr(self, '_'+a) for a in attrs]) + diff --git a/src/software/test/rpmcache.py b/src/software/test/rpmcache.py index 9a0e98a..c2dde24 100644 --- a/src/software/test/rpmcache.py +++ b/src/software/test/rpmcache.py @@ -18,49 +18,59 @@ # # Authors: Radek Novacek <rnovacek@redhat.com> # Authors: Michal Minar <miminar@redhat.com> + """ Creation and manipulation utilities with rpm cache for software tests. """ + import copy import datetime import os import pickle +import random import re from collections import defaultdict from subprocess import call, check_output, CalledProcessError +from package import Package +import util + DB_BACKUP_FILE = 'lmi_software_test_cache' RE_AVAIL_PKG = re.compile( r'^(?P<name>[^\s]+)\.(?P<arch>[a-zA-Z0-9_]+)' - r'\s+(?P<epoch>([0-9]+:)?)(?P<version>[a-zA-Z0-9._+-]+)' - r'-(?P<release>[a-zA-Z0-9_.]+)\s+' - r'(?P<repository>[a-zA-Z0-9_-]+)\s*$', re.MULTILINE) + r'\s+((?P<epoch>[0-9]+):)?(?P<ver>[a-zA-Z0-9._+-]+)' + r'-(?P<rel>[a-zA-Z0-9_.]+)\s+' + r'(?P<repo>[a-zA-Z0-9_-]+)\s*$', re.MULTILINE) # this won't match the last entry, unless "package\n" is not appended # at the end of the string RE_PKG_DEPS = re.compile( r'^package:\s*(?P<name>[^\s]+)\.(?P<arch>[a-zA-Z0-9_]+)' - r'\s+(?P<epoch>([0-9]+:)?)(?P<version>[a-zA-Z0-9._+-]+)' - r'-(?P<release>[a-zA-Z0-9_.]+)\s+(?P<dep_list>.*?)' + r'\s+((?P<epoch>[0-9]+):)?(?P<ver>[a-zA-Z0-9._+-]+)' + r'-(?P<rel>[a-zA-Z0-9_.]+)\s+(?P<dep_list>.*?)' r'(?=^package|\Z)', re.MULTILINE | re.DOTALL) RE_DEPS_PROVIDERS = re.compile( r'^\s*dependency:\s*(?P<dependency>.+?)\s*' - r'(?P<providers>(^\s+provider:.+?$)+)', re.MULTILINE | re.IGNORECASE) + r'(?P<providers>(^\s+unsatisfied\s+dependency$)|(^\s+provider:.+?$)+)', + re.MULTILINE | re.IGNORECASE) +RE_DEPS_UNSATISFIED = re.compile(r'^\s+unsatisfied\s+dep.*$', re.IGNORECASE) RE_PROVIDER = re.compile( r'^\s+provider:\s*(?P<name>[^\s]+)\.(?P<arch>[a-zA-Z0-9_]+)' - r'\s+(?P<epoch>([0-9]+:)?)(?P<version>[a-zA-Z0-9._+-]+)' - r'-(?P<release>[a-zA-Z0-9_.]+)\s*$', re.IGNORECASE | re.MULTILINE) + r'\s+((?P<epoch>[0-9]+):)?(?P<ver>[a-zA-Z0-9._+-]+)' + r'-(?P<rel>[a-zA-Z0-9_.]+)\s*$', re.IGNORECASE | re.MULTILINE) RE_PKG_INFO = re.compile( r'^Name\s*:\s*(?P<name>[^\s]+).*?' r'^(Epoch\s*:\s*(?P<epoch>[0-9]+)\s+)?' - r'^Version\s*:\s*(?P<version>[a-zA-Z0-9._+-]+)\s+' - r'^Release\s*:\s*(?P<release>[^\s]+)\s+.*?' + r'^Version\s*:\s*(?P<ver>[a-zA-Z0-9._+-]+)\s+' + r'^Release\s*:\s*(?P<rel>[^\s]+)\s+.*?' r'^Size\s*:\s*(?P<size>\d+(\.\d+)?)( *(?P<units>[kMG]))?', re.MULTILINE | re.DOTALL | re.IGNORECASE) RE_REPO = re.compile( r'(?:^\*?)(?P<name>[^\s/]+\b)(?!\s+id)', re.MULTILINE | re.IGNORECASE) -# maximum number of packages, that will be selected for testing +# Maximum number of packages, that will be selected for testing / 2 +# There are 2 sets of test packages (safe and dangerous). When running +# in dangerous mode, the resulting number will be twice as much. MAX_PKG_DB_SIZE = 10 # step used to iterate over package names used to check for thery dependencies # it's a number of packages, that will be passed to yum command at once @@ -78,104 +88,6 @@ class MissingRPM(InvalidTestCache): InvalidTestCache.__init__(self, "Missing package '%s' in test cache!"%pkg_name) -def make_nevra(name, epoch, ver, rel, arch, with_epoch='NOT_ZERO'): - """ - @param with_epoch may be one of: - "NOT_ZERO" - include epoch only if it's not zero - "ALWAYS" - include epoch always - "NEVER" - do not include epoch at all - """ - estr = '' - if with_epoch.lower() == "always": - estr = epoch - elif with_epoch.lower() == "not_zero": - if epoch != "0": - estr = epoch - if len(estr): - estr += ":" - return "%s-%s%s-%s.%s" % (name, estr, ver, rel, arch) - -def run_yum(*params, **kwargs): - """ - Runs yum with params and returns its output - It's here especially to allow pass a repolist argument, that - specifies list of repositories, to run the command on. - """ - cmd = ['yum'] + list(params) - repolist = kwargs.get('repolist', []) - if repolist: - cmd += ['--disablerepo=*'] - cmd += ['--enablerepo='+r for r in repolist] - try: - return check_output(cmd) - except Exception: - import pdb;pdb.set_trace() - -class Package(object): #pylint: disable=R0902 - """ - Element of test package database. It's a container for package - informations. It contains two sets of versions for single package. - That's meant for updating tests. - """ - def __init__(self, name, epoch, ver, rel, arch, repo, - up_epoch, up_ver, up_rel, up_repo): - """ - Arguments prefixed with 'up_' are for newer package. - """ - self._name = name - self._epoch = epoch - self._ver = ver - self._rel = rel - self._arch = arch - self._repo = repo - self._up_epoch = up_epoch - self._up_ver = up_ver - self._up_rel = up_rel - self._up_repo = up_repo - - def __str__(self): - return self.get_nevra() - - @property - def name(self): return self._name #pylint: disable=C0111,C0321 - @property - def epoch(self): return self._epoch #pylint: disable=C0111,C0321 - @property - def ver(self): return self._ver #pylint: disable=C0111,C0321 - @property - def rel(self): return self._rel #pylint: disable=C0111,C0321 - @property - def arch(self): return self._arch #pylint: disable=C0111,C0321 - @property - def repo(self): return self._repo #pylint: disable=C0111,C0321 - @property - def nevra(self): #pylint: disable=C0111,C0321 - return self.get_nevra(True) - - @property - def up_epoch(self): return self._up_epoch #pylint: disable=C0111,C0321 - @property - def up_ver(self): return self._up_ver #pylint: disable=C0111,C0321 - @property - def up_rel(self): return self._up_rel #pylint: disable=C0111,C0321 - @property - def up_repo(self): return self._up_repo #pylint: disable=C0111,C0321 - @property - def up_nevra(self): #pylint: disable=C0111,C0321 - return self.get_nevra(True) - - def get_nevra(self, newer=True, with_epoch='NOT_ZERO'): - """ - @newer if True, evr part is made from properties prefixed with 'up_' - @return pkg nevra string - """ - if newer: - attrs = ['name', 'up_epoch', 'up_ver', 'up_rel', 'arch'] - else: - attrs = ['name', 'epoch', 'ver', 'rel', 'arch'] - return make_nevra(*[getattr(self, '_'+a) for a in attrs], - with_epoch=with_epoch) - def _match_nevr(match): """ @param match is a regexp match object with parsed rpm package @@ -183,22 +95,33 @@ def _match_nevr(match): """ epoch = match.group('epoch') return ( match.group('name') - , epoch[:-1] if epoch else "0" - , match.group('version') - , match.group('release')) + , epoch if epoch and epoch.lower() != "(none)" else "0" + , match.group('ver') + , match.group('rel')) -def _match2pkg(match): +def _match2pkg(match, safe=False): """ @param match is a regexp match object with attributes: name, epoch, version, release, arch + @param safe if True, the packe will have up_* properties equal to + non-prefixed ones, otherwise they will be set to None @return instance of Package """ + kwargs = {} + if safe is True: + kwargs['safe'] = True + else: + for attr in ('epoch', 'ver', 'rel', 'repo'): + kwargs['up_'+attr] = None + epoch = match.group('epoch') + if not epoch or epoch.lower() == "(none)": + epoch = '0' return Package( match.group('name'), - match.group('epoch')[:-1] if match.group('epoch') else '0', - match.group('version'), match.group('release'), - match.group('arch'), match.groupdict().get('repository', None), - None, None, None, None) + epoch, + match.group('ver'), match.group('rel'), + match.group('arch'), match.groupdict().get('repo', None), + **kwargs) def _filter_duplicates(installed, avail_str): """ @@ -213,7 +136,7 @@ def _filter_duplicates(installed, avail_str): dups_list = [] cur_package_matches = [] prev_match = None - system_arch = get_system_architecture() + system_arch = util.get_system_architecture() for match in RE_AVAIL_PKG.finditer(avail_str): if ( _match_nevr(match) in [ _match_nevr(m) for m in cur_package_matches] @@ -246,12 +169,14 @@ def _check_single_pkg_deps( """ for match_deps in RE_DEPS_PROVIDERS.finditer(dependencies_str): providers = [] + if RE_DEPS_UNSATISFIED.search(match_deps.group('providers')): + return False for match_dep in RE_PROVIDER.finditer(match_deps.group('providers')): if match_dep.group('name') not in installed: continue providers.append(_match2pkg(match_dep)) for provider in providers: - if is_installed(provider, False): + if is_pkg_installed(provider, False): break else: # no provider is installed return False @@ -261,7 +186,7 @@ def _check_pkg_dependencies( installed, dup_list, number_of_packages=MAX_PKG_DB_SIZE, - repolist=[]): + repolist=None): """ Finds packages from dup_list with satisfied (installed) dependencies. @param installed is a set of installed package names @@ -274,7 +199,7 @@ def _check_pkg_dependencies( yum_params = yum_params[:1] for dups in dups_part: yum_params.extend([d.get_nevra(newer=False) for d in dups]) - deplist_str = run_yum(*yum_params, repolist=repolist) + deplist_str = util.run_yum(*yum_params, repolist=repolist) matches = RE_PKG_DEPS.finditer(deplist_str) prev_match = None for pkgs in dups_part: @@ -300,14 +225,14 @@ def _check_pkg_dependencies( break return dups_no_deps -def _sorted_db_by_size(pkgdb, repolist=[]): +def _sorted_db_by_size(pkgdb, repolist=None): """ @param pkgdb is a list of lists of packages with common name @return sorted instances of Package according to their size """ yum_params = ['info', '--showduplicates'] yum_params.extend([ps[0].name for ps in pkgdb]) - info_str = run_yum(*yum_params, repolist=repolist) + info_str = util.run_yum(*yum_params, repolist=repolist) pkg_sizes = {} # to get correct ordering from "yum info" command # { pkg_name : [(epoch, version, release), ... ] } @@ -329,7 +254,7 @@ def _sorted_db_by_size(pkgdb, repolist=[]): if not epoch: epoch = "0" pkg_version_order[pkg_name].append(( - epoch, info_match.group('version'), info_match.group('release'))) + epoch, info_match.group('ver'), info_match.group('rel'))) pkgdb = sorted(pkgdb, key=lambda pkgs: pkg_sizes[pkgs[0].name])[ :MAX_PKG_DB_SIZE] @@ -350,7 +275,7 @@ def _get_repo_list(): repos_str = check_output(['yum', 'repolist', '-q']) return RE_REPO.findall(repos_str) -def _download_pkgdb(repolist, pkgdb, cache_dir=None): +def _download_dangerous(repolist, pkgdb, cache_dir=None): """ Downloads all rpm packages (old and newer versions) from package database to current directory. @@ -385,32 +310,6 @@ def _make_rpm_path(pkg, cache_dir='', newer=True, without_epoch=False): with_epoch='NEVER' if without_epoch else 'NOT_ZERO') return os.path.join(cache_dir, nevra) + '.rpm' -def is_installed(pkg, newer=True): - """ - Check, whether package is installed. - """ - if not isinstance(pkg, Package): - return call(["rpm", "--quiet", "-q", pkg]) == 0 - else: - try: - cmd = [ "rpm", "-q", "--qf", "%{EPOCH}:%{NVRA}", pkg.get_nevra( - newer, with_epoch='NEVER') ] - out = check_output(cmd) - epoch, nvra = out.split(':') #pylint: disable=E1103 - if not epoch or epoch == "(none)": - epoch = "0" - return ( epoch == pkg.epoch - and nvra == pkg.get_nevra(newer=newer, with_epoch="NEVER")) - except CalledProcessError: - return False - -def rpm_exists(pkg, cache_dir='', newer=True): - """ - @return True, when rpm package is in cache. - """ - return ( os.path.exists(_make_rpm_path(pkg, cache_dir, newer)) - or os.path.exists(_make_rpm_path(pkg, cache_dir, newer, True))) - def get_rpm_name(pkg, cache_dir='', newer=True): """ Some packages do not have epoch in their name, even if it's higher than @@ -425,56 +324,126 @@ def get_rpm_name(pkg, cache_dir='', newer=True): return path raise MissingRPM(pkg.name) -def get_system_architecture(): +def rpm_exists(pkg, cache_dir='', newer=True): + """ + @return True, when rpm package is in cache. + """ + return ( os.path.exists(_make_rpm_path(pkg, cache_dir, newer)) + or os.path.exists(_make_rpm_path(pkg, cache_dir, newer, True))) + + +def remove_pkg(pkg, *args): + """ + Remove package with rpm command. + @param pkg is either instance of Package or package name + @param args is a list of parameters for rpm command + """ + if isinstance(pkg, Package): + pkg = pkg.name + call(["rpm", "--quiet"] + list(args) + ["-e", pkg]) + +def install_pkg(pkg, newer=True, repolist=None): """ - @return the system architecture name as seen by rpm + Install a specific package. + @param pkg is either package name or instance of Package + In latter case, a specific version is installed. + @param repolist is a list of repositories, that should be + used for downloading, if using yum + when empty, all enabled repositories are used """ - return check_output(['rpm', '-q', '--qf', '%{ARCH}\n', 'rpm']) + if isinstance(pkg, Package): + try: + rpm_name = get_rpm_name(pkg, newer=newer) + call(["rpm", "--quiet", "-i", rpm_name]) + return + except MissingRPM: + pass + pkg = pkg.name + util.run_yum('-q', '-y', 'install', pkg, repolist=repolist) + +def ensure_pkg_installed(pkg, newer=True, repolist=None): + """ + Ensures, that specific version of package is installed. If other + version is installed, it is removed and reinstalled. + """ + if not isinstance(pkg, Package): + raise TypeError("pkg must be a Package instance") + if not is_pkg_installed(pkg, newer): + if is_pkg_installed(pkg.name): + remove_pkg(pkg.name) + install_pkg(pkg, newer, repolist) -def write_pkgdb(pkgdb, cache_dir=''): +def verify_pkg(pkg): + """ + @return output of command rpm, with verification output for package + """ + if isinstance(pkg, basestring): + name = pkg + elif isinstance(pkg, Package): + name = pkg.name + else: + raise TypeError("pkg must be either package name or Package instance") + return call(["rpm", "--quiet", "-Va", name]) == 0 + +def has_pkg_config_file(pkg, file_path): + """ + @return True, if file_path is a configuration file of package pkg. + """ + cmd = ['rpm', '-qc', pkg.name] + out = check_output(cmd) + return file_path in set(out.splitlines()) #pylint: disable=E1103 + +def has_pkg_doc_file(pkg, file_path): + """ + @return True, if file_path is a documentation file of package pkg. + """ + cmd = ['rpm', '-qd', pkg.name] + out = check_output(cmd) + return file_path in set(out.splitlines()) #pylint: disable=E1103 + +def is_pkg_installed(pkg, newer=True): + """ + Check, whether package is installed. + """ + if not isinstance(pkg, Package): + return call(["rpm", "--quiet", "-q", pkg]) == 0 + else: + cmd = [ "rpm", "-q", "--qf", "%{EPOCH}:%{NVRA}\n", pkg.get_nevra( + newer, with_epoch='NEVER') ] + try: + out = check_output(cmd).splitlines()[0] + epoch, nvra = out.split(':') #pylint: disable=E1103 + if not epoch or epoch.lower() == "(none)": + epoch = "0" + return ( epoch == getattr(pkg, 'up_epoch' if newer else 'epoch') + and nvra == pkg.get_nevra(newer=newer, with_epoch="NEVER")) + except CalledProcessError: + return False + +def write_pkgdb(safe_pkgs, dangerous_pkgs, cache_dir=''): """ Writes package database into a file named DB_BACKUP_FILE. """ with open(os.path.join(cache_dir, DB_BACKUP_FILE), 'w') as db_file: - pickle.dump((datetime.datetime.now(), pkgdb), db_file) + data = (datetime.datetime.now(), safe_pkgs, dangerous_pkgs) + pickle.dump(data, db_file) def load_pkgdb(cache_dir=''): """ This is inverse function to _write_pkgdb(). - @return package database loaded from file + @return (safe, dangerous) package lists loaded from file """ with open(os.path.join(cache_dir, DB_BACKUP_FILE), 'r') as db_file: - date_time, pkgdb = pickle.load(db_file) + _, safe, dangerous = pickle.load(db_file) #print "Loaded package database from: %s" % date_time - return pkgdb + return safe, dangerous -def get_pkg_database(force_update=False, use_cache=True, - cache_dir='', - repolist=[]): +def make_dangerous_list(installed, repolist=None): """ - Checks yum database for available packages, that have at least two - different versions in repositories. Only not installed ones with - all of their dependencies intalled are selected. - And from those, few of the smallest are downloaded as rpms. - @return list of instances of Package of selected packages + This makes a list of instances of Package for dangerous tests. """ - if ( use_cache and not force_update - and os.path.exists(os.path.join(cache_dir, DB_BACKUP_FILE))): - pkgdb = load_pkgdb(cache_dir) - valid_db = True - for pkg in pkgdb: - if ( not rpm_exists(pkg, cache_dir, False) - or not rpm_exists(pkg, cache_dir, True)): - valid_db = False - #print "Old package database is not valid" - break - if valid_db: - return pkgdb - #print "Getting installed packages" - installed = set(check_output( #pylint: disable=E1103 - ['rpm', '-qa', '--qf=%{NAME}\n']).splitlines()) #print "Getting all available packages" - avail_str = run_yum('list', 'available', '--showduplicates', + avail_str = util.run_yum('list', 'available', '--showduplicates', repolist=repolist) # list of lists of packages with the same name, longer than 2 #print "Finding duplicates" @@ -485,11 +454,88 @@ def get_pkg_database(force_update=False, use_cache=True, number_of_packages=MAX_PKG_DB_SIZE*5, repolist=repolist) #print "Selecting the smallest ones" - pkgdb = _sorted_db_by_size(selected, repolist=repolist) + return _sorted_db_by_size(selected, repolist=repolist) + +def make_safe_list(installed, exclude=None): + """ + Makes list of installed packages for non-dangerous tests. + @param installed is a list of installed packages names + @param exclude is a list of package names, that won't appear in result + """ + if exclude is None: + exclude = set() + base = list(installed) + random.shuffle(base) + res = [] + i = 0 + while len(res) < MAX_PKG_DB_SIZE and i < len(base): + name = base[i] + i += 1 + if name in exclude: + continue + cmd = [ "rpm", "-q", "--qf", "%{EPOCH}:%{NVRA}\n", name ] + envra = check_output(cmd).splitlines()[0] + res.append(_match2pkg(util.RE_ENVRA.match(envra), safe=True)) + return res + +def get_pkg_database( + force_update=False, + use_cache=True, + dangerous=False, + cache_dir='', + repolist=None): + """ + Checks yum database for available packages, that have at least two + different versions in repositories. Only not installed ones with + all of their dependencies intalled are selected. + And from those, few of the smallest are downloaded as rpms. + @param dangerous whether to load available, not installed + packages for dangerous tests + @return (safe, dangerous) + where + safe is a list of instances of Package, representing installed + software, these should be used for not-dangerous tests; + both older + dangerous is a list of instances of Package of selected packages, + that are not installed, but available; instances contain + both newer and older version of package + """ + dangerous_pkgs = [] + if ( use_cache and not force_update + and os.path.exists(os.path.join(cache_dir, DB_BACKUP_FILE))): + safe_pkgs, dangerous_pkgs = load_pkgdb(cache_dir) + valid_db = True + if len(safe_pkgs) < MAX_PKG_DB_SIZE: + valid_db = False + elif not dangerous and len(dangerous_pkgs) > 0: + dangerous_pkgs = [] + elif dangerous and len(dangerous_pkgs) == 0: + valid_db = False + else: + for pkg in safe_pkgs: + if not is_pkg_installed(pkg): + valid_db = False + break + for pkg in dangerous_pkgs: + if ( not rpm_exists(pkg, cache_dir, False) + or not rpm_exists(pkg, cache_dir, True)): + valid_db = False + #print "Old package database is not valid" + break + if valid_db: + return (safe_pkgs, dangerous_pkgs) + #print "Getting installed packages" + installed = set(check_output( #pylint: disable=E1103 + ['rpm', '-qa', '--qf=%{NAME}\n']).splitlines()) + if dangerous: + dangerous_pkgs = make_dangerous_list(installed) + safe_pkgs = make_safe_list(installed, exclude=set( + pkg.name for pkg in dangerous_pkgs)) + if use_cache: - repolist = _get_repo_list() - _download_pkgdb(repolist, pkgdb, cache_dir) + repolist = _get_repo_list() if repolist in (None, []) else repolist + _download_dangerous(repolist, dangerous_pkgs, cache_dir) #print "Backing up database information" - write_pkgdb(pkgdb, cache_dir) - return pkgdb + write_pkgdb(safe_pkgs, dangerous_pkgs, cache_dir) + return (safe_pkgs, dangerous_pkgs) diff --git a/src/software/test/run.py b/src/software/test/run.py index 950b0c9..00ef304 100755 --- a/src/software/test/run.py +++ b/src/software/test/run.py @@ -68,6 +68,7 @@ def parse_cmd_line(): parser.add_argument("-p", "--password", default=os.environ.get("LMI_CIMOM_PASSWORD", ''), help="User's password.") + dangerous_group = parser.add_mutually_exclusive_group() dangerous_group.add_argument("--run-dangerous", action="store_true", default=(os.environ.get('LMI_RUN_DANGEROUS', '0') == '1'), @@ -78,6 +79,17 @@ def parse_cmd_line(): dest="run_dangerous", default=os.environ.get('LMI_RUN_DANGEROUS', '0') == '1', help="Disable dangerous tests.") + + tedious_group = parser.add_mutually_exclusive_group() + tedious_group.add_argument("--run-tedious", action="store_true", + default=(os.environ.get('LMI_RUN_TEDIOUS', '0') == '1'), + help="Run also tedious (long running) for this machine." + " Overrides environment variable LMI_RUN_TEDIOUS.") + tedious_group.add_argument('--no-tedious', action="store_false", + dest="run_tedious", + default=os.environ.get('LMI_RUN_TEDIOUS', '0') == '1', + help="Disable tedious tests.") + cache_group = parser.add_mutually_exclusive_group() cache_group.add_argument("-c", "--use-cache", action="store_true", default=(os.environ.get('LMI_SOFTWARE_USE_CACHE', '0') == '1'), @@ -160,6 +172,8 @@ def prepare_environment(args): os.environ['LMI_CIMOM_PASSWORD'] = args.password os.environ['LMI_RUN_DANGEROUS'] = ( '1' if args.run_dangerous else '0') + os.environ["LMI_RUN_TEDIOUS"] = ( + '1' if args.run_tedious else '0') os.environ['LMI_SOFTWARE_USE_CACHE'] = '1' if args.use_cache else '0' if args.use_cache: os.environ['LMI_SOFTWARE_CACHE_DIR'] = CACHE_DIR @@ -270,9 +284,12 @@ def main(): repolist = args.test_repos.split(',') else: repolist = [] - rpmcache.get_pkg_database(args.force_update, args.use_cache, - CACHE_DIR, repolist=repolist) - #rpmcache.make_pkg_database(packages + rpmcache.get_pkg_database( + args.force_update, + args.use_cache, + dangerous=args.run_dangerous, + cache_dir=CACHE_DIR, + repolist=repolist) prepare_environment(args) test_program = unittest.main(argv=ut_args, testLoader=LMITestLoader(), exit=False) diff --git a/src/software/test/test_hosted_software_collection.py b/src/software/test/test_hosted_software_collection.py new file mode 100755 index 0000000..dc798d8 --- /dev/null +++ b/src/software/test/test_hosted_software_collection.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python +# +# Copyright (C) 2012 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Michal Minar <miminar@redhat.com> +# +""" +Unit tests for LMI_MemberOfSoftwareCollection provider. +""" + +import pywbem +import socket +import unittest + +import base + +class TestHostedSoftwareCollection(base.SoftwareBaseTestCase): + """ + Basic cim operations test. + """ + + CLASS_NAME = "LMI_HostedSoftwareCollection" + KEYS = ("Antecedent", "Dependent") + + def make_op(self): + """ + @param ses SoftwareElementState property value + @return object path of SoftwareIdentity + """ + objpath = self.objpath.copy() + objpath["Antecedent"] = pywbem.CIMInstanceName( + classname="Linux_ComputerSystem", + namespace="root/cimv2", + keybindings=pywbem.NocaseDict({ + "CreationClassName" : "Linux_ComputerSystem", + "Name" : socket.gethostname() + })) + objpath["Dependent"] = pywbem.CIMInstanceName( + classname="LMI_SystemSoftwareCollection", + namespace="root/cimv2", + keybindings=pywbem.NocaseDict({ + "InstanceID" : "LMI:SystemSoftwareCollection" + })) + return objpath + + def test_get_instance(self): + """ + Tests GetInstance call on packages from our rpm cache. + """ + objpath = self.make_op() + inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False) + self.assertIsInstance(inst, pywbem.CIMInstance) + self.assertEqual(inst.path, objpath) + self.assertEqual(sorted(inst.properties.keys()), sorted(self.KEYS)) + self.assertEqual(objpath, inst.path) + for key in self.KEYS: + self.assertEqual(inst[key], inst.path[key]) + + # try with CIM_ prefix + antecedent = objpath["Antecedent"].copy() + objpath["Antecedent"].classname = "CIM_ComputerSystem" + + inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False) + self.assertIsInstance(inst, pywbem.CIMInstance) + objpath["Antecedent"] = antecedent.copy() + self.assertEqual(objpath, inst.path) + self.assertEqual(sorted(inst.properties.keys()), sorted(self.KEYS)) + for key in self.KEYS: + self.assertEqual(inst[key], inst.path[key]) + + # try with CIM_ prefix also for CreationClassName + objpath["Antecedent"]["CreationClassName"] = "CIM_ComputerSystem" + + inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False) + self.assertIsInstance(inst, pywbem.CIMInstance) + objpath["Antecedent"] = antecedent.copy() + self.assertEqual(objpath, inst.path) + self.assertEqual(sorted(inst.properties.keys()), sorted(self.KEYS)) + for key in self.KEYS: + self.assertEqual(inst[key], inst.path[key]) + + def test_enum_instances(self): + """ + Tests EnumInstances call on installed packages. + """ + objpath = self.make_op() + insts = self.conn.EnumerateInstances(ClassName=self.CLASS_NAME) + self.assertEqual(1, len(insts)) + self.assertEqual(objpath, insts[0].path) + self.assertEqual(sorted(insts[0].properties.keys()), sorted(self.KEYS)) + self.assertEqual(objpath, insts[0].path) + for key in self.KEYS: + self.assertEqual(insts[0][key], insts[0].path[key]) + + def test_enum_instance_names(self): + """ + Tests EnumInstanceNames call on installed packages. + """ + objpath = self.make_op() + inames = self.conn.EnumerateInstanceNames(ClassName=self.CLASS_NAME) + self.assertEqual(1, len(inames)) + self.assertEqual(objpath, inames[0]) + + def test_get_antecedent_referents(self): + """ + Test ReferenceNames for ComputerSystem. + """ + objpath = self.make_op() + refs = self.conn.ReferenceNames( + Role="Antecedent", + ObjectName=objpath["Antecedent"]) + self.assertGreater(len(refs), 0) + refs = [ r for r in refs if "Dependent" in r + and r["Dependent"].classname == \ + "LMI_SystemSoftwareCollection" ] + self.assertEqual(1, len(refs)) + ref = refs[0] + self.assertEqual(ref, objpath) + + @base.mark_dangerous + def test_get_dependent_referents(self): + """ + Test ReferenceNames for SystemSoftwareCollection. + """ + objpath = self.make_op() + refs = self.conn.AssociatorNames( + ObjectName=objpath["Dependent"], + Role="Dependent", + ResultRole="Antecedent", + ResultClass="Linux_ComputerSystem") + self.assertEqual(1, len(refs)) + ref = refs[0] + self.assertEqual(objpath["Antecedent"], ref) + +def suite(): + """For unittest loaders.""" + return unittest.TestLoader().loadTestsFromTestCase( + TestHostedSoftwareCollection) + +if __name__ == '__main__': + unittest.main() diff --git a/src/software/test/test_member_of_software_collection.py b/src/software/test/test_member_of_software_collection.py new file mode 100755 index 0000000..4d375c4 --- /dev/null +++ b/src/software/test/test_member_of_software_collection.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python +# +# Copyright (C) 2012 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Michal Minar <miminar@redhat.com> +# +""" +Unit tests for LMI_MemberOfSoftwareCollection provider. +""" + +import pywbem +import unittest + +import base + +class TestMemberOfSoftwareCollection(base.SoftwareBaseTestCase): + """ + Basic cim operations test. + """ + + CLASS_NAME = "LMI_MemberOfSoftwareCollection" + KEYS = ("Collection", "Member") + + def make_op(self, pkg, newer=True): + """ + @return object path of MembeOfSoftwareCollection association + """ + objpath = self.objpath.copy() + objpath["Collection"] = pywbem.CIMInstanceName( + classname="LMI_SystemSoftwareCollection", + namespace="root/cimv2", + keybindings=pywbem.NocaseDict({ + "InstanceID" : "LMI:SystemSoftwareCollection" + })) + objpath["Member"] = pywbem.CIMInstanceName( + classname="LMI_SoftwareIdentity", + namespace="root/cimv2", + keybindings=pywbem.NocaseDict({ + "InstanceID" : 'LMI:PKG:' + pkg.get_nevra(newer=newer, + with_epoch="ALWAYS") + })) + return objpath + + def test_get_instance(self): + """ + Tests GetInstance call on packages from our rpm cache. + """ + for pkg in self.safe_pkgs: + objpath = self.make_op(pkg) + inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False) + self.assertEqual(inst.path, objpath, + "Object paths should match for package %s"%pkg) + for key in self.KEYS: + self.assertTrue(inst.properties.has_key(key), + "OP is missing \"%s\" key for package %s"%(key, pkg)) + self.assertEqual(objpath, inst.path) + + @base.mark_tedious + def test_enum_instance_names(self): + """ + Tests EnumInstanceNames call on installed packages. + """ + inames = self.conn.EnumerateInstanceNames(ClassName=self.CLASS_NAME) + self.assertGreater(len(inames), 0) + objpath = self.make_op(self.safe_pkgs[0]) + for iname in inames: + self.assertIsInstance(iname, pywbem.CIMInstanceName) + self.assertEqual(iname.namespace, 'root/cimv2') + self.assertEqual(iname.classname, self.CLASS_NAME) + self.assertEqual(sorted(iname.keys()), sorted(self.KEYS)) + self.assertEqual(objpath["Collection"], iname["Collection"]) + nevra_set = set(i["Member"]["InstanceID"] for i in inames) + for pkg in self.safe_pkgs: + nevra = 'LMI:PKG:'+pkg.get_nevra(with_epoch="ALWAYS") + self.assertTrue(nevra in nevra_set, + 'Missing nevra "%s".' % nevra) + +# @base.mark_tedious +# def test_enum_instances(self): +# """ +# Tests EnumInstances call on installed packages. +# """ +# insts = self.conn.EnumerateInstances(ClassName=self.CLASS_NAME) +# self.assertGreater(len(insts), 0) +# for inst in insts: +# self.assertIsInstance(inst, pywbem.CIMInstance) +# self.assertEqual(inst.namespace, 'root/cimv2') +# self.assertEqual(inst.classname, self.CLASS_NAME) +# self.assertEqual(sorted(inst.keys()), sorted(self.KEYS)) +# self.assertEqual(inst["InstanceID"], inst.path["InstanceID"]) +# nevra_set = set(i["Member"]["InstanceID"] for i in insts) +# for pkg in self.safe_pkgs: +# self.assertIn(pkg.get_nevra(with_epoch="ALWAYS"), nevra_set) + + @base.mark_tedious + def test_get_collection_referents(self): + """ + Test ReferenceNames for SystemSoftwareCollection. + """ + objpath = self.make_op(self.safe_pkgs[0]) + refs = self.conn.AssociatorNames( + Role="Collection", + ObjectName=objpath["Collection"], + ResultRole="Member", + ResultClass="LMI_SoftwareIdentity") + self.assertGreater(len(refs), 0) + for ref in refs: + self.assertIsInstance(ref, pywbem.CIMInstanceName) + self.assertEqual(ref.namespace, 'root/cimv2') + self.assertEqual(ref.classname, "LMI_SoftwareIdentity") + self.assertEqual(sorted(ref.keys()), ["InstanceID"]) + self.assertTrue(ref["InstanceID"].startswith("LMI:PKG:")) + nevra_set = set(i["InstanceID"] for i in refs) + # NOTE: installed packages might not be available + for pkg in self.safe_pkgs: + nevra = 'LMI:PKG:'+pkg.get_nevra(with_epoch="ALWAYS") + self.assertTrue(nevra in nevra_set, + 'Missing nevra "%s".' % nevra) + + def test_get_member_referents(self): + """ + Test ReferenceNames for SoftwareIdentity. + """ + for pkg in self.safe_pkgs: + objpath = self.make_op(pkg) + refs = self.conn.AssociatorNames( + ObjectName=objpath["Member"], + Role="Member", + ResultRole="Collection", + ResultClass="LMI_SystemSoftwareCollection") + self.assertEqual(len(refs), 1) + ref = refs[0] + self.assertEqual(objpath["Collection"], ref) + +def suite(): + """For unittest loaders.""" + return unittest.TestLoader().loadTestsFromTestCase( + TestMemberOfSoftwareCollection) + +if __name__ == '__main__': + unittest.main() diff --git a/src/software/test/test_software_file_check.py b/src/software/test/test_software_file_check.py deleted file mode 100755 index 7b97ee9..0000000 --- a/src/software/test/test_software_file_check.py +++ /dev/null @@ -1,527 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (C) 2012 Red Hat, Inc. All rights reserved. -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA -# -# Authors: Michal Minar <miminar@redhat.com> -# -""" -Unit tests for LMI_SoftwareFileCheck provider. -""" - -import hashlib -import os -import pywbem -import re -import subprocess -import unittest -from collections import namedtuple - -import common - -RE_CHECKSUM = re.compile(r'^([0-9a-fA-F]+)\s+.*') - -PassedFlags = namedtuple("PassedFlags", #pylint: disable=C0103 - "exists, type, size, mode, checksum, dev, ltarget, uid, gid, mtime") - -class TestSoftwareFileCheck(common.SoftwareBaseTestCase): - #pylint: disable=R0904 - """ - Basic cim operations test. - """ - - CLASS_NAME = "LMI_SoftwareFileCheck" - KEYS = ( "CheckID", "Name", "SoftwareElementID", "SoftwareElementState" - , "TargetOperatingSystem", "Version") - - hash_num2algo = { - 1 : hashlib.md5, #pylint: disable=E1101 - 2 : hashlib.sha1, #pylint: disable=E1101 - 8 : hashlib.sha256, #pylint: disable=E1101 - 9 : hashlib.sha384, #pylint: disable=E1101 - 10 : hashlib.sha512, #pylint: disable=E1101 - 11 : hashlib.sha224 #pylint: disable=E1101 - } - - hash_num2cmd = { - 1 : "md5sum", - 2 : "sha1sum", - 8 : "sha256sum", - 9 : "sha384sum", - 10 : "sha512sum", - 11 : "sha224sum" - } - - hash_num2length = { - 1 : 32, - 2 : 40, - 8 : 64, - 9 : 96, - 10 : 128, - 11 : 56 - } - - def make_op(self, pkg, filename, newer=True): - """ - @return object path of LMI_SoftwareFileCheck - """ - objpath = self.objpath.copy() - objpath["Name"] = filename - objpath["Version"] = getattr(pkg, "up_ver" if newer else "ver") - objpath["CheckID"] = "%s#%s" % (pkg.name, filename) - objpath["SoftwareElementState"] = pywbem.Uint16(2) - objpath["TargetOperatingSystem"] = pywbem.Uint16(36) - objpath["SoftwareElementID"] = pkg.get_nevra(newer) - return objpath - - def assertEqualSEID(self, id1, id2, msg=None): - """ - This is for comparison of SoftwareElementID property values. - """ - if ( not isinstance(id1, basestring) - or not isinstance(id2, basestring)): - return common.SoftwareBaseTestCase.assertEqual(self, id1, id2, msg) - match1 = common.RE_NEVRA.match(id1) - match2 = common.RE_NEVRA.match(id2) - if not match1 or not match2: - return common.SoftwareBaseTestCase.assertEqual(self, id1, id2, msg) - if any( match1.group(g) != match2.group(g) - for g in ('name', 'ver', 'rel', 'arch')): - return common.SoftwareBaseTestCase.assertEqual(self, id1, id2, msg) - epoch1 = match1.group('epoch') - epoch2 = match2.group('epoch') - if not epoch1: - epoch1 = '0' - if not epoch2: - epoch2 = '0' - if epoch1 != epoch2: - return common.SoftwareBaseTestCase.assertEqual(self, id1, id2, msg) - return True - - def assertEqual(self, op1, op2, msg=None): - """ - This is override for object paths, that allows some differences - (like missing epoch in SoftwareElementID). - """ - if ( not isinstance(op1, pywbem.CIMInstanceName) - or not isinstance(op2, pywbem.CIMInstanceName) - or op1.classname != op2.classname - or op1.namespace != op2.namespace - or op1.keybindings.keys() != op2.keybindings.keys() - or any(op1[a] != op2[a] for a in - ( 'Name', 'Version', 'CheckID', 'SoftwareElementState' - , 'TargetOperatingSystem'))): - return common.SoftwareBaseTestCase.assertEqual(self, op1, op2, msg) - return self.assertEqualSEID( - op1['SoftwareElementID'], op2['SoftwareElementID'], msg) - - def make_checksum_str(self, csumnum, filename): - """ - @return checksum of installed file - """ - return RE_CHECKSUM.match(subprocess.check_output([ - self.hash_num2cmd[csumnum], filename])).group(1).lower() - - def do_check_symlink(self, pkg, filepath, inst): - """ - Assert some details about symlink. - """ - target = os.readlink(filepath) - stats = os.lstat(filepath) - - self.assertEqual(pywbem.Uint16(3), inst["FileType"], - "Unexpected file type of symlink for %s:%s"%( - pkg.name, filepath)) - self.assertEqual(stats.st_uid, inst["FileUserID"], - "Unexpected uid of symlink for %s:%s"%(pkg.name, filepath)) - self.assertEqual(stats.st_gid, inst["FileGroupID"], - "Unexpected gid of symlink for %s:%s"%(pkg.name, filepath)) - self.assertEqual(stats.st_mode, inst["FileMode"], - "Unexpected mode of symlink for %s:%s" %(pkg.name, filepath)) - self.assertEqual(stats.st_size, inst["FileSize"], - "Unexpected size of symlink for %s:%s"%(pkg.name, filepath)) - self.assertEqual(target, inst["LinkTarget"], - "Unexpected size of symlink for %s:%s"%(pkg.name, filepath)) - self.assertEqual("0"*self.hash_num2length[inst["FileChecksumType"]], - inst["FileChecksum"], - "Unexpected hash of symlink for %s:%s"%(pkg.name, filepath)) - self.assertEqual(int(stats.st_mtime), inst["LastModificationTime"], - "Unexpected mtime of symlink for %s:%s"%(pkg.name, filepath)) - - # modify owner - prev_user = inst["FileUserID"] - #prev_mtime = inst["LastModificationTime"] - #prev_pflags = PassedFlags(*inst["PassedFlags"]) - os.lchown(filepath, stats.st_uid + 1, -1) - inst = self.conn.GetInstance(InstanceName=inst.path) - self.assertEqual(inst["ExpectedFileUserID"] + 1, inst["FileUserID"], - "Unexpected uid of modified symlink for %s:%s"%( - pkg.name, filepath)) - self.assertEqual(prev_user + 1, inst["FileUserID"], - "Unexpected uid of modified symlink for %s:%s"%( - pkg.name, filepath)) - self.assertEqual(stats.st_gid, inst["FileGroupID"], - "Unexpected gid of modified symlink for %s:%s"%( - pkg.name, filepath)) - cur_pflags = PassedFlags(*inst["PassedFlags"]) - #self.assertGreater(inst["LastModificationTime"], prev_mtime) - - self.assertTrue(cur_pflags.exists, - "Symlink %s:%s should exist." %(pkg.name, filepath)) - self.assertTrue(cur_pflags.type, - "File type should match for symlink %s:%s"%(pkg.name, filepath)) - self.assertTrue(cur_pflags.size, - "File size should match for symlink %s:%s"%(pkg.name, filepath)) - self.assertTrue(cur_pflags.mode, - "File mode should match for symlink %s:%s"%(pkg.name, filepath)) - self.assertTrue(cur_pflags.checksum, - "File checksum should match for symlink %s:%s"%( - pkg.name, filepath)) - self.assertTrue(cur_pflags.dev, - "Device number should match for symlink %s:%s"%( - pkg.name, filepath)) - self.assertTrue(cur_pflags.ltarget, - "Link target should match for symlink %s:%s"%(pkg.name, filepath)) - self.assertFalse(cur_pflags.uid, - "Uid should not match for symlink %s:%s"%(pkg.name, filepath)) - self.assertTrue(cur_pflags.gid, - "Gid shoud match for symlink %s:%s"%(pkg.name, filepath)) - self.assertTrue(cur_pflags.mtime, - "Mtime should match for symlink %s:%s"%(pkg.name, filepath)) - - # modify link_target - os.remove(filepath) - os.symlink("wrong" + "*"*len(inst["ExpectedLinkTarget"]), filepath) - os.lchown(filepath, inst["ExpectedFileUserID"], - inst["ExpectedFileGroupID"]) - - inst = self.conn.GetInstance(InstanceName=inst.path) - cur_pflags = PassedFlags(*inst["PassedFlags"]) - self.assertGreater(len(inst["LinkTarget"]), - len(inst["ExpectedLinkTarget"])) - - self.assertTrue(cur_pflags.exists, - "File %s:%s should exist"%(pkg.name, filepath)) - self.assertTrue(cur_pflags.type, - "File type should match for symlink %s:%s"%(pkg.name, filepath)) - # file size not checked for symlinks - #self.assertFalse(cur_pflags.size, - #"File size should not match for symlink %s:%s"%( - #pkg.name, filepath)) - self.assertTrue(cur_pflags.mode, - "File mode should match for symlink %s:%s"%(pkg.name, filepath)) - self.assertTrue(cur_pflags.checksum, - "Checksum should match for symlink %s:%s"%( - pkg.name, filepath)) - self.assertTrue(cur_pflags.dev, - "Device numbers should match for symlink %s:%s"%( - pkg.name, filepath)) - self.assertFalse(cur_pflags.ltarget, - "Link target should not match for symlink %s:%s"%( - pkg.name, filepath)) - self.assertTrue(cur_pflags.uid, - "File uid should match for symlink %s:%s"%(pkg.name, filepath)) - self.assertTrue(cur_pflags.gid, - "File gid should match for symlink %s:%s"%(pkg.name, filepath)) - self.assertTrue(cur_pflags.mtime, - "File mtime should match for symlink %s:%s"%(pkg.name, filepath)) - - def do_check_directory(self, pkg, filepath, inst): - """ - Assert some details about directory. - """ - stats = os.lstat(filepath) - - self.assertEqual(pywbem.Uint16(2), inst["FileType"], - "Unexpected type for directory %s:%s"%(pkg.name, filepath)) - self.assertEqual(stats.st_uid, inst["FileUserID"], - "Unexpected uid for directory %s:%s"%(pkg.name, filepath)) - self.assertEqual(stats.st_gid, inst["FileGroupID"], - "Unexpected gid for directory %s:%s"%(pkg.name, filepath)) - self.assertEqual(stats.st_mode, inst["FileMode"], - "Unexpected mode for directory %s:%s"%(pkg.name, filepath)) - self.assertEqual(stats.st_size, inst["FileSize"], - "Unexpected size for directory %s:%s"%(pkg.name, filepath)) - self.assertIs(inst["LinkTarget"], None) - self.assertEqual("0"*self.hash_num2length[inst["FileChecksumType"]], - inst["FileChecksum"], - "Unexpected checksum for directory %s:%s"%(pkg.name, filepath)) - self.assertEqual(int(stats.st_mtime), inst["LastModificationTime"], - "Unexpected mtime for directory %s:%s"%(pkg.name, filepath)) - - def do_check_file(self, pkg, filepath, inst): - """ - Assert some details about regurar file. - """ - stats = os.lstat(filepath) - - self.assertEqual(pywbem.Uint16(1), inst["FileType"], - "Unexpected file type for %s:%s"%(pkg.name, filepath)) - self.assertEqual(stats.st_uid, inst["FileUserID"], - "Unexpected file uid for %s:%s"%(pkg.name, filepath)) - self.assertEqual(stats.st_gid, inst["FileGroupID"], - "Unexpected gid for regular file %s:%s"%(pkg.name, filepath)) - self.assertEqual(stats.st_mode, inst["FileMode"], - "Unexpected mode for reqular file %s:%s"%(pkg.name, filepath)) - self.assertEqual(stats.st_size, inst["FileSize"], - "Unexpected size for reqular file %s:%s"%(pkg.name, filepath)) - self.assertIs(inst["LinkTarget"], None) - csum = self.make_checksum_str(inst['FileChecksumType'], filepath) - self.assertEqual(csum, inst["FileChecksum"].lower(), - "Unexpected checksum for reqular file %s:%s"%(pkg.name, filepath)) - self.assertEqual(inst["LastModificationTime"], - inst["ExpectedLastModificationTime"], - "Unexpected mtime for reqular file %s:%s"%(pkg.name, filepath)) - self.assertEqual(int(stats.st_mtime), inst["LastModificationTime"], - "Unexpected mtime for reqular file %s:%s"%(pkg.name, filepath)) - - # make it longer - with open(filepath, "a+") as fobj: - fobj.write("data\n") - inst = self.conn.GetInstance(InstanceName=inst.path) - cur_pflags = PassedFlags(*inst["PassedFlags"]) - self.assertGreater(inst["FileSize"], inst["ExpectedFileSize"], - "File size should be greater, then expected for reqular file" - " %s:%s"%(pkg.name, filepath)) - self.assertGreater(inst["LastModificationTime"], - inst["ExpectedLastModificationTime"], - "Unexpected mtime for reqular file %s:%s"%(pkg.name, filepath)) - - self.assertTrue(cur_pflags.exists, - "Regular file should exist %s:%s"%(pkg.name, filepath)) - self.assertTrue(cur_pflags.type, - "Type of regular file should match for %s:%s"%(pkg.name, filepath)) - self.assertFalse(cur_pflags.size, - "Size should not match for regular file %s:%s"%( - pkg.name, filepath)) - self.assertTrue(cur_pflags.mode, - "Mode should match for regular file %s:%s"%(pkg.name, filepath)) - self.assertFalse(cur_pflags.checksum, - "Checksum should not match for regular file %s:%s"%( - pkg.name, filepath)) - self.assertTrue(cur_pflags.dev, - "Device number should match for regular file %s:%s"%( - pkg.name, filepath)) - self.assertTrue(cur_pflags.ltarget, - "Link target should match for regular file %s:%s"%( - pkg.name, filepath)) - self.assertTrue(cur_pflags.uid, - "File uid should match for %s:%s"%(pkg.name, filepath)) - self.assertTrue(cur_pflags.gid, - "File gid should match for %s:%s"%(pkg.name, filepath)) - self.assertFalse(cur_pflags.mtime, - "File mtime should not match for %s:%s"%(pkg.name, filepath)) - - # change file type - os.remove(filepath) - os.symlink(filepath, filepath) - os.lchown(filepath, inst["ExpectedFileUserID"], - inst["ExpectedFileGroupID"]) - inst = self.conn.GetInstance(InstanceName=inst.path) - cur_pflags = PassedFlags(*inst["PassedFlags"]) - self.assertNotEqual(inst["ExpectedLinkTarget"], inst["LinkTarget"], - "Link target should not match for %s:%s"%(pkg.name, filepath)) - self.assertNotEqual(inst["ExpectedFileSize"], inst["FileSize"], - "File size should not match for %s:%s"%(pkg.name, filepath)) - self.assertGreater(inst["LastModificationTime"], - inst["ExpectedLastModificationTime"], - "File mtime should be greater than expected for %s:%s"%( - pkg.name, filepath)) - self.assertNotEqual(inst["ExpectedFileType"], inst["FileType"], - "File type should not match for %s:%s"%(pkg.name, filepath)) - self.assertEqual(pywbem.Uint16(3), inst["FileType"], - "File type should match for %s:%s"%(pkg.name, filepath)) - - self.assertTrue(cur_pflags.exists, - "Regular file %s:%s should exist"%(pkg.name, filepath)) - self.assertFalse(cur_pflags.type, - "Regular file type should not match for %s:%s"%( - pkg.name, filepath)) - self.assertFalse(cur_pflags.size, - "Size should not match for regular file %s:%s"%( - pkg.name, filepath)) - self.assertFalse(cur_pflags.mode, - "File mode should not match for regular file %s:%s"%( - pkg.name, filepath)) - self.assertFalse(cur_pflags.checksum, - "Checksum should not match for regular file %s:%s"%( - pkg.name, filepath)) - self.assertTrue(cur_pflags.dev, - "Device should match for regular file %s:%s"%( - pkg.name, filepath)) - self.assertFalse(cur_pflags.ltarget, - "Link target should not match for regular file %s:%s"%( - pkg.name, filepath)) - self.assertTrue(cur_pflags.uid, - "Regular file's uid should match for %s:%s"%( - pkg.name, filepath)) - self.assertTrue(cur_pflags.gid, - "Regular file's gid should match for %s:%s"%( - pkg.name, filepath)) - self.assertFalse(cur_pflags.mtime, - "Regular file's mtime should not match for %s:%s"%( - pkg.name, filepath)) - - # remove it - os.remove(filepath) - inst = self.conn.GetInstance(InstanceName=inst.path) - cur_pflags = PassedFlags(*inst["PassedFlags"]) - self.assertEqual(inst["ExpectedLinkTarget"], inst["LinkTarget"], - "Link target does not match for regular file %s:%s"%( - pkg.name, filepath)) - self.assertNotEqual(inst["ExpectedFileSize"], inst["FileSize"], - "File size should not match for regular file %s:%s"%( - pkg.name, filepath)) - self.assertIsNone(inst["LastModificationTime"]) - self.assertIsNone(inst["FileType"]) - self.assertIsNone(inst["FileChecksum"]) - self.assertIsNone(inst["FileMode"]) - self.assertIsNone(inst["FileUserID"]) - self.assertIsNone(inst["FileGroupID"]) - - self.assertFalse(cur_pflags.exists, - "Regular file %s:%s should not exist"%(pkg.name, filepath)) - self.assertFalse(cur_pflags.type, - "Regular file's type should not match for %s:%s"%( - pkg.name, filepath)) - self.assertFalse(cur_pflags.size, - "Regular file's size should not match for %s:%s"%( - pkg.name, filepath)) - self.assertFalse(cur_pflags.mode, - "Regular file's mode should not match for %s:%s"%( - pkg.name, filepath)) - self.assertFalse(cur_pflags.checksum, - "Regular file's checksum should not match for %s:%s"%( - pkg.name, filepath)) - self.assertFalse(cur_pflags.dev, - "Regular file's dev number should not match %s:%s"%( - pkg.name, filepath)) - self.assertFalse(cur_pflags.ltarget, - "Regular file's link target should not match for %s:%s"%( - pkg.name, filepath)) - self.assertFalse(cur_pflags.uid, - "Regular file's uid should not match for %s:%s"%( - pkg.name, filepath)) - self.assertFalse(cur_pflags.gid, - "Regular file's guid should not match for %s:%s"%( - pkg.name, filepath)) - self.assertFalse(cur_pflags.mtime, - "Regular file's mtime should not match for %s:%s"%( - pkg.name, filepath)) - - @common.mark_dangerous - def test_get_instance(self): - """ - Tests GetInstance call. - """ - for pkg in self.pkgdb: - files = self.pkg_files[pkg.name] - if ( common.is_installed(pkg) - and not common.verify_pkg(pkg.name)): - common.remove_pkg(pkg.name) - if not common.is_installed(pkg.name): - self.install_pkg(pkg) - self.assertTrue(common.is_installed(pkg), - "Package %s must be installed"%pkg) - - for filepath in files: - objpath = self.make_op(pkg, filepath) - - inst = self.conn.GetInstance( - InstanceName=objpath, - LocalOnly=False) - self.assertIsInstance(inst, pywbem.CIMInstance) - self.assertEqual(objpath, inst.path, - msg="Object paths of instance must match for %s:%s"%( - pkg.name, filepath)) - for key in self.KEYS: - if key.lower() == "softwareelementid": - self.assertEqualSEID(inst[key], objpath[key], - "OP key %s values should match for %s:%s"%( - key, pkg.name, filepath)) - elif key.lower() == "targetoperatingsystem": - self.assertIsInstance(objpath[key], (int, long)) - else: - self.assertEqual(objpath[key], inst[key], - "OP key %s values should match for %s:%s"%( - key, pkg.name, filepath)) - - self.assertTrue(inst["FileExists"], - "File %s:%s must exist"%(pkg.name, filepath)) - self.assertEqual(10, len(inst["PassedFlags"]), - "PassedFlags must have constant length") - for i, flag in enumerate(inst["PassedFlags"]): - self.assertTrue(flag is True, - "Flag \"%s\" should all match for file %s:%s"%( - inst["PassedFlagsDescriptions"][i], pkg.name, filepath)) - for prop in ( "FileType", "FileUserID", "FileGroupID" - , "FileMode", "FileSize", "LinkTarget" - , "FileChecksum", "FileModeFlags"): - if ( ( os.path.islink(filepath) - or (not os.path.isfile(filepath))) - and prop == "FileSize"): - continue - self.assertEqual(inst["Expected"+prop], inst[prop], - "%s should match for %s:%s"%(prop, pkg.name, filepath)) - if os.path.islink(filepath): - self.do_check_symlink(pkg, filepath, inst) - elif os.path.isdir(filepath): - self.do_check_directory(pkg, filepath, inst) - elif os.path.isfile(filepath): - self.do_check_file(pkg, filepath, inst) - - @common.mark_dangerous - def test_invoke_method(self): - """ - Tests Invoke method invocation. - """ - for pkg in self.pkgdb: - files = self.pkg_files[pkg.name] - if common.is_installed(pkg) and not common.verify_pkg(pkg.name): - common.remove_pkg(pkg.name) - if not common.is_installed(pkg.name): - self.install_pkg(pkg) - self.assertTrue(common.is_installed(pkg), - "Package %s must be installed"%pkg) - for filepath in files: - objpath = self.make_op(pkg, filepath) - - (rval, _) = self.conn.InvokeMethod( - MethodName="Invoke", - ObjectName=objpath) - self.assertEqual(pywbem.Uint32(0), rval, - msg="InvokeMethod should be successful for %s:%s"%( - pkg.name, filepath)) - - # modify file - if os.path.isfile(filepath): - os.remove(filepath) - else: - os.lchown(filepath, os.stat(filepath).st_uid + 1, -1) - (rval, _) = self.conn.InvokeMethod( - MethodName="Invoke", - ObjectName=objpath) - self.assertEqual(pywbem.Uint32(2), rval, - "InvokeMethod should not pass for modified file %s:%s"%( - pkg.name, filepath)) - -def suite(): - """For unittest loaders.""" - return unittest.TestLoader().loadTestsFromTestCase( - TestSoftwareFileCheck) - -if __name__ == "__main__": - unittest.main() diff --git a/src/software/test/test_software_identity.py b/src/software/test/test_software_identity.py new file mode 100755 index 0000000..934d4c9 --- /dev/null +++ b/src/software/test/test_software_identity.py @@ -0,0 +1,154 @@ +#!/usr/bin/env python +# +# Copyright (C) 2012 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Michal Minar <miminar@redhat.com> +# +""" +Unit tests for LMI_SoftwareIdentity provider. +""" + +from datetime import datetime, timedelta +import itertools +import pywbem +import unittest + +import base +import rpmcache +import util + +class TestSoftwareIdentity(base.SoftwareBaseTestCase): #pylint: disable=R0904 + """ + Basic cim operations test. + """ + + CLASS_NAME = "LMI_SoftwareIdentity" + KEYS = ("InstanceID", ) + + def make_op(self, pkg, newer=True): + """ + @param ses SoftwareElementState property value + @return object path of SoftwareIdentity + """ + objpath = self.objpath.copy() + objpath["InstanceID"] = 'LMI:PKG:'+pkg.get_nevra(newer, "ALWAYS") + return objpath + + @base.mark_dangerous + def test_get_instance(self): + """ + Dangerous test, making sure, that properties are set correctly + for installed and removed packages. + """ + for pkg in self.dangerous_pkgs: + if rpmcache.is_pkg_installed(pkg.name): + rpmcache.remove_pkg(pkg.name) + objpath = self.make_op(pkg) + inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False) + self.assertIsNone(inst["InstallDate"]) + time_stamp = datetime.now(pywbem.cim_types.MinutesFromUTC(0)) \ + - timedelta(seconds=0.01) + rpmcache.install_pkg(pkg) + inst2 = self.conn.GetInstance( + InstanceName=objpath, LocalOnly=False) + self.assertIsNotNone(inst2["InstallDate"]) + self.assertGreater(inst2["InstallDate"].datetime, time_stamp) + + def test_get_instance_safe(self): + """ + Tests GetInstance call on packages from our rpm cache. + """ + for pkg in self.safe_pkgs: + objpath = self.make_op(pkg) + inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False) + self.assertEqual(inst.path, objpath, + "Object paths should match for package %s"%pkg) + for key in self.KEYS: + self.assertTrue(inst.properties.has_key(key), + "OP is missing \"%s\" key for package %s"%(key, pkg)) + self.assertIsInstance(inst["Caption"], basestring) + self.assertIsInstance(inst["Description"], basestring) + self.assertEqual(pkg.up_evra, inst["VersionString"], + "VersionString does not match evra for pkg %s" % pkg) + self.assertTrue(inst['IsEntity']) + self.assertEqual(pkg.name, inst["Name"], + "Name does not match for pkg %s" % pkg) + self.assertIsInstance(inst["Epoch"], pywbem.Uint32, + "Epoch does not match for pkg %s" % pkg) + self.assertEqual(int(pkg.epoch), inst["Epoch"]) + self.assertEqual(pkg.ver, inst["Version"], + "Version does not match for pkg %s" % pkg) + self.assertEqual(pkg.rel, inst["Release"], + "Release does not match for pkg %s" % pkg) + self.assertEqual(pkg.arch, inst["Architecture"], + "Architecture does not match for pkg %s" % pkg) + + # try to leave out epoch part + if pkg.epoch == "0": + op_no_epoch = objpath.copy() + op_no_epoch["SoftwareElementID"] = util.make_nevra(pkg.name, + pkg.up_epoch, pkg.up_ver, pkg.up_rel, pkg.arch, "NEVER") + self.assertNotIn(":", op_no_epoch["SoftwareElementID"]) + inst = self.conn.GetInstance( + InstanceName=op_no_epoch, LocalOnly=False) + self.assertIn(inst.path, (objpath, op_no_epoch)) + + @base.mark_tedious + def test_enum_instance_names_safe(self): + """ + Tests EnumInstanceNames call on installed packages. + """ + inames = self.conn.EnumerateInstanceNames(ClassName=self.CLASS_NAME) + self.assertGreater(len(inames), 0) + for iname in inames: + self.assertIsInstance(iname, pywbem.CIMInstanceName) + self.assertEqual(iname.namespace, 'root/cimv2') + self.assertEqual(sorted(iname.keys()), sorted(self.KEYS)) + nevra_set = set(i["InstanceID"] for i in inames) + for pkg in self.safe_pkgs: + self.assertIn('LMI:PKG:'+pkg.get_nevra(with_epoch="ALWAYS"), + nevra_set) + +# @base.mark_tedious +# def test_enum_instances(self): +# """ +# Tests EnumInstances call on installed packages. +# """ +# insts = self.conn.EnumerateInstances(ClassName=self.CLASS_NAME) +# self.assertGreater(len(insts), 0) +# for inst in insts: +# self.assertIsInstance(inst, pywbem.CIMInstance) +# self.assertEqual(inst.namespace, 'root/cimv2') +# self.assertEqual(sorted(inst.keys()), sorted(self.KEYS)) +# self.assertEqual(inst["InstanceID"], inst.path["InstanceID"]) +# nevra_set = set() +# name_set = set() +# for inst in insts: +# nevra_set.add(inst["InstanceID"]) +# name_set.add(inst["Name"]) +# for pkg in self.safe_pkgs: +# self.assertIn("LMI:PKG:"+pkg.get_nevra(with_epoch="ALWAYS"), +# nevra_set) +# self.assertIn(pkg.name, name_set) + +def suite(): + """For unittest loaders.""" + return unittest.TestLoader().loadTestsFromTestCase( + TestSoftwareIdentity) + +if __name__ == '__main__': + unittest.main() diff --git a/src/software/test/test_software_installed_package.py b/src/software/test/test_software_installed_package.py deleted file mode 100755 index 93669f9..0000000 --- a/src/software/test/test_software_installed_package.py +++ /dev/null @@ -1,351 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (C) 2012 Red Hat, Inc. All rights reserved. -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA -# -# Authors: Michal Minar <miminar@redhat.com> -# -""" -Unit tests for LMI_SoftwareInstalledPackage provider. -""" - -import os -import pywbem -import socket -import stat -import unittest - -import common - -def make_path_tuple(objpath): - return tuple(objpath[a] for a in ("SoftwareElementID", "Name", "Version", - "SoftwareElementState")) - -class TestSoftwareInstalledPackage(common.SoftwareBaseTestCase): - """ - Basic cim operations test. - """ - - CLASS_NAME = "LMI_SoftwareInstalledPackage" - KEYS = ("Software", "System") - - def make_op(self, pkg, newer=True, ses=2): - """ - @param ses SoftwareElementState property value - @return object path of LMI_SoftwareInstaledPackage - """ - objpath = self.objpath.copy() - pkg_op = pywbem.CIMInstanceName( - classname="LMI_SoftwarePackage", namespace="root/cimv2", - keybindings={ - "Name" : pkg.name, - "SoftwareElementID" : pkg.get_nevra(newer, 'ALWAYS'), - "SoftwareElementState" : pywbem.Uint16(ses), - "TargetOperatingSystem" : pywbem.Uint16(36), - "Version" : getattr(pkg, 'up_ver' if newer else 'ver') }) - system_op = pywbem.CIMInstanceName( - classname="Linux_ComputerSystem", namespace="root/cimv2", - keybindings={ - "CreationClassName" : "Linux_ComputerSystem", - "Name" : socket.gethostname() }) - objpath["Software"] = pkg_op - objpath["System"] = system_op - return objpath - - @common.mark_dangerous - def test_get_instance(self): - """ - Tests GetInstance call on packages from our rpm cache. - TODO: test this in non-dangerous way - """ - for pkg in self.pkgdb: - if not common.is_installed(pkg): - self.install_pkg(pkg) - objpath = self.make_op(pkg) - inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False) - self.assertIsSubclass(inst.path.classname, self.CLASS_NAME) - self.assertIsSubclass( - inst.path['System'].classname, objpath['System'].classname) - self.assertIsSubclass(inst.path['Software'].classname, - objpath['Software'].classname) - self.assertEqual(2, len(inst.path.keys())) - for key in self.KEYS: - self.assertEqual(inst[key], inst.path[key], - "Object paths should be the same for %s"%pkg) - self.assertEqual(inst['Software'], objpath['Software'], - "Software key reference should match for %s"%pkg) - common.remove_pkg(pkg.name) - objpath['Software']['SoftwareElementState'] = pywbem.Uint16(1) - with self.assertRaises(pywbem.CIMError) as cmngr: - self.conn.GetInstance(InstanceName=objpath, LocalOnly=False) - self.assertEqual(pywbem.CIM_ERR_NOT_FOUND, cmngr.exception.args[0], - "Package %s should not be installed"%pkg) - - @common.mark_dangerous - def test_enum_instance_names(self): - """ - Tests EnumInstances call. - TODO: test this in non-dangerous way - """ - pkg = self.pkgdb[0] if len(self.pkgdb) > 0 else None - if pkg and common.is_installed(pkg.name): - common.remove_pkg(pkg.name) - insts1 = self.conn.EnumerateInstanceNames(ClassName=self.CLASS_NAME) - if pkg: - self.install_pkg(pkg) - insts2 = self.conn.EnumerateInstanceNames(ClassName=self.CLASS_NAME) - self.assertEqual(len(insts1) + 1, len(insts2)) - - if pkg: - objpath = self.make_op(pkg) - self.assertIn(make_path_tuple(objpath['Software']), - set(make_path_tuple(inst['Software']) for inst in insts2)) - self.assertTrue(all(isinstance(inst, pywbem.CIMInstanceName) - for inst in insts1)) - - @common.mark_dangerous - def test_enum_instances(self): - """ - Tests EnumInstances call. - """ - pkg = self.pkgdb[0] if len(self.pkgdb) > 0 else None - if pkg and not common.is_installed(pkg.name): - self.install_pkg(pkg.name) - insts1 = self.conn.EnumerateInstances(ClassName=self.CLASS_NAME) - if pkg: - common.remove_pkg(pkg.name) - insts2 = self.conn.EnumerateInstances(ClassName=self.CLASS_NAME) - objpath = self.make_op(pkg) - self.assertEqual(len(insts2) + 1, len(insts1)) - path_values = set(make_path_tuple(p["Software"]) for p in insts1) - self.assertIn(make_path_tuple(objpath['Software']), path_values) - path_values = set(make_path_tuple(p["Software"]) for p in insts2) - self.assertNotIn(make_path_tuple(objpath['Software']), path_values) - - self.assertTrue(all(inst['Software'] == inst.path['Software'] - for inst in insts1)) - self.assertTrue(all(inst['System'] == inst.path['System'] - for inst in insts1)) - - @common.mark_dangerous - def test_create_instance(self): - """ - Tests CreateInstance call. - """ - for pkg in self.pkgdb: - if common.is_installed(pkg.name): - common.remove_pkg(pkg.name) - self.assertFalse(common.is_installed(pkg.name), - "Package %s should not be installed"%pkg) - objpath = self.make_op(pkg, ses=1) - inst = pywbem.CIMInstance(classname=objpath.classname, path=objpath) - inst["Software"] = objpath["Software"] - inst["System"] = objpath["System"] - iname = self.conn.CreateInstance(NewInstance=inst) - self.assertIsInstance(iname, pywbem.CIMInstanceName) - objpath["Software"]["SoftwareElementState"] = pywbem.Uint16(2) - self.assertEqual(iname["Software"], objpath["Software"], - "Software key reference should match for %s"%pkg) - self.assertIsInstance(iname["System"], pywbem.CIMInstanceName) - self.assertIsSubclass(iname["System"].classname, - objpath["System"].classname) - self.assertEqual(set(iname.keys()), set(objpath.keys()), - "Keys of object paths should be the same for %s"%pkg) - self.assertTrue(common.is_installed(pkg.name), - "Package %s should be installed"%pkg) - - with self.assertRaises(pywbem.CIMError) as cmngr: - self.conn.CreateInstance(NewInstance=inst) - self.assertEqual(pywbem.CIM_ERR_ALREADY_EXISTS, - cmngr.exception.args[0], - "Package %s should already be installed"%pkg) - - @common.mark_dangerous - def test_delete_instance(self): - """ - Tests DeleteInstance call. - """ - for pkg in self.pkgdb: - if not common.is_installed(pkg.name): - self.install_pkg(pkg) - self.assertTrue(common.is_installed(pkg.name), - "Package %s must be installed"%pkg) - objpath = self.make_op(pkg) - self.conn.DeleteInstance(objpath) - self.assertFalse(common.is_installed(pkg.name), - "Package %s must be uninstalled"%pkg) - with self.assertRaises(pywbem.CIMError) as cmngr: - self.conn.DeleteInstance(objpath) - self.assertIn(cmngr.exception.args[0], - [pywbem.CIM_ERR_FAILED, pywbem.CIM_ERR_NOT_FOUND], - "Package %s can not be uninstalled again"%pkg) - - @common.mark_dangerous - def test_check_integrity(self): - """ - Tests CheckIntegrity call. - TODO: test this in non-dangerous way - """ - for pkg in self.pkgdb: - files = self.pkg_files[pkg.name] - if ( ( common.is_installed(pkg) - and not common.verify_pkg(pkg.name)) - or ( common.is_installed(pkg.name) - and not common.is_installed(pkg))) : - common.remove_pkg(pkg.name) - if not common.is_installed(pkg.name): - self.install_pkg(pkg) - self.assertTrue(common.is_installed(pkg), - "Package %s must be installed"%pkg) - - objpath = self.make_op(pkg) - (rval, oparms) = self.conn.InvokeMethod( - MethodName="CheckIntegrity", - ObjectName=objpath) - self.assertEqual(pywbem.Uint32(0), rval, - "IntegrityCheck should pass for %s"%pkg.name) # check passed - self.assertEqual(1, len(oparms)) - self.assertIn("Failed", oparms) - self.assertEqual(0, len(oparms["Failed"]), - "IntegrityCheck should not fail for %s"%pkg.name) - - cnt_bad = 0 - for file_path in files: - stats = os.lstat(file_path) - if os.path.islink(file_path): # modify symbolic link - target = os.readlink(file_path) - os.remove(file_path) - os.symlink(target, file_path) # just touch symlink - (rval, oparms) = self.conn.InvokeMethod( - MethodName="CheckIntegrity", - ObjectName=objpath) - # symlink must pass - self.assertEqual(cnt_bad, len(oparms["Failed"]), - "Symlink %s:%s should pass"%(pkg.name, file_path)) - os.remove(file_path) - # now change target - os.symlink("wrong_link_target", file_path) - elif os.path.isdir(file_path): # check directory - os.chmod(file_path, stats.st_mode) # just touch dir - (rval, oparms) = self.conn.InvokeMethod( - MethodName="CheckIntegrity", - ObjectName=objpath) - # dir must pass - self.assertEqual(cnt_bad, len(oparms["Failed"]), - "Directory %s:%s should pass"%(pkg.name, file_path)) - # modify read access of directory - os.chmod(file_path, stats.st_mode ^ stat.S_IROTH) - else: # modify regular file - # just touch file - this is enough to make it fail - with open(file_path, "w+"): - pass - cnt_bad += 1 - (rval, oparms) = self.conn.InvokeMethod( - MethodName="CheckIntegrity", - ObjectName=objpath) - self.assertEqual(pywbem.Uint32(1), rval, - "File %s:%s should not pass"%(pkg.name, file_path)) - self.assertEqual(1, len(oparms)) - self.assertIn("Failed", oparms) - self.assertEqual(len(oparms["Failed"]), cnt_bad, - "Number of errors not correct. Failed for %s:%s" % ( - pkg.name, file_path)) - self.assertIn(file_path, (p["Name"] for p in oparms["Failed"]), - "File %s:%s should also be in failed"%(pkg.name, file_path)) - - @common.mark_dangerous - def test_method_update(self): - """ - Tests Update method invocation. - """ - for pkg in self.pkgdb: - if ( common.is_installed(pkg.name) - and not common.is_installed(pkg, False)): - common.remove_pkg(pkg.name) - if not common.is_installed(pkg.name): - self.install_pkg(pkg, False) - self.assertTrue(common.is_installed(pkg, False), - "Package %s must be installed"%pkg.get_nevra(False)) - - objpath = self.make_op(pkg, False) - op_up = self.make_op(pkg) - (rval, oparms) = self.conn.InvokeMethod( - MethodName="Update", - ObjectName=objpath) - self.assertEqual(pywbem.Uint16(1), rval, - "Update should succed for %s"%pkg.get_nevra(False)) - self.assertEqual(1, len(oparms)) - self.assertIn("Installed", oparms) - self.assertEqual(oparms["Installed"], op_up["Software"], - "Object paths should match for %s"%pkg) - self.assertTrue(common.is_installed(pkg), - "Package %s must be installed"%pkg) - - (rval, oparms) = self.conn.InvokeMethod( - MethodName="Update", - ObjectName=op_up) - self.assertEqual(pywbem.Uint16(0), rval, - "Package %s should be already updated"%pkg) - self.assertEqual(1, len(oparms)) - self.assertEqual(oparms["Installed"], op_up["Software"], - "Object paths should match for %s"%pkg) - self.assertTrue(common.is_installed(pkg.name), - "Package %s must be installed"%pkg) - self.assertFalse(common.is_installed(pkg, False), - "Older package %s must not be installed" % - pkg.get_nevra(False)) - - with self.assertRaises(pywbem.CIMError) as cmngr: - self.conn.InvokeMethod( - MethodName="Update", - ObjectName=objpath) - self.assertEqual(pywbem.CIM_ERR_NOT_FOUND, cmngr.exception.args[0], - "Older package %s should not be installed" % - pkg.get_nevra(False)) - - common.remove_pkg(pkg.name) - self.install_pkg(pkg, False) - self.assertTrue(common.is_installed(pkg, False)) - - (rval, oparms) = self.conn.InvokeMethod( - MethodName="Update", - ObjectName=objpath, - Epoch=pkg.epoch, - Version=pkg.ver, - Release=pkg.rel) - self.assertEqual(pywbem.Uint16(0), rval, - "Update of package %s should succeed"%pkg) - self.assertEqual(oparms["Installed"], objpath["Software"], - "Object paths should be the same for package %s"%pkg) - - (rval, oparms) = self.conn.InvokeMethod( - MethodName="Update", - ObjectName=objpath, - Epoch=pkg.up_epoch, - Version=pkg.up_ver, - Release=pkg.up_rel) - self.assertEqual(pywbem.Uint16(1), rval, - "Package %s can not be updated twice to highest version"%pkg) - self.assertEqual(oparms["Installed"], op_up["Software"], - "Object paths should match for package %s"%pkg) - -def suite(): - """For unittest loaders.""" - return unittest.TestLoader().loadTestsFromTestCase( - TestSoftwareInstalledPackage) - -if __name__ == "__main__": - unittest.main() diff --git a/src/software/test/test_software_package.py b/src/software/test/test_software_package.py deleted file mode 100755 index 4cd5da9..0000000 --- a/src/software/test/test_software_package.py +++ /dev/null @@ -1,153 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (C) 2012 Red Hat, Inc. All rights reserved. -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA -# -# Authors: Michal Minar <miminar@redhat.com> -# -""" -Unit tests for LMI_SoftwareInstalledPackage provider. -""" - -import pywbem -import unittest - -import common -import rpmcache - -class TestSoftwarePackage(common.SoftwareBaseTestCase): #pylint: disable=R0904 - """ - Basic cim operations test. - """ - - CLASS_NAME = "LMI_SoftwarePackage" - KEYS = ( "Name", "SoftwareElementID", "SoftwareElementState" - , "TargetOperatingSystem", "Version") - - def make_op(self, pkg, newer=True, ses=2): - """ - @param ses SoftwareElementState property value - @return object path of SoftwarePackage - """ - objpath = self.objpath.copy() - objpath["Name"] = pkg.name - objpath["SoftwareElementID"] = pkg.get_nevra(newer, "ALWAYS") - objpath["SoftwareElementState"] = pywbem.Uint16(ses) - objpath["TargetOperatingSystem"] = pywbem.Uint16(36) - objpath["Version"] = getattr(pkg, 'up_ver' if newer else 'ver') - return objpath - - @common.mark_dangerous - def test_get_instance(self): - """ - Tests GetInstance call on packages from our rpm cache. - TODO: test this in non-dangerous way - """ - for pkg in self.pkgdb: - if common.is_installed(pkg.name): - common.remove_pkg(pkg.name) - objpath = self.make_op(pkg, ses=1) - inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False) - self.assertEqual(inst.path, objpath, - "Object paths should match for %s"%pkg) - for key in self.KEYS: - self.assertTrue(inst.properties.has_key(key), - "OP is missing \"%s\" key for package %s"%(key, pkg)) - if key == "TargetOperatingSystem": - self.assertIsInstance(inst.path[key], (int, long)) - else: - self.assertEqual(inst.path[key], inst[key], - "Object paths of instance should match for %s"%pkg) - self.assertEqual(pkg.up_rel, inst['Release'], - "Release property should match for %s"%pkg) - self.install_pkg(pkg) - objpath['SoftwareElementState'] = pywbem.Uint16(2) - inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False) - self.assertEqual(inst.path, objpath, - "Object paths should match for %s"%pkg) - - # try to leave out epoch part - if pkg.epoch == "0": - op_no_epoch = objpath.copy() - op_no_epoch["SoftwareElementID"] = rpmcache.make_nevra(pkg.name, - pkg.up_epoch, pkg.up_ver, pkg.up_rel, pkg.arch, "NEVER") - self.assertNotIn(":", op_no_epoch["SoftwareElementID"]) - inst = self.conn.GetInstance( - InstanceName=op_no_epoch, LocalOnly=False) - self.assertIn(inst.path, (objpath, op_no_epoch)) - - @common.mark_dangerous - def test_method_install(self): - """ - Tests Install method invocation. - """ - for pkg in self.pkgdb: - if common.is_installed(pkg.name): - common.remove_pkg(pkg.name) - objpath = self.make_op(pkg, ses=1) - (rval, oparms) = self.conn.InvokeMethod( - MethodName="Install", - ObjectName=objpath) - self.assertEqual(pywbem.Uint32(1), rval, - "Installation of %s should be successful"%pkg) - self.assertEqual(1, len(oparms)) - self.assertTrue(oparms.has_key('Installed')) - objpath['SoftwareElementState'] = pywbem.Uint16(2) - self.assertEqual(oparms['Installed'], objpath, - "Object paths should match for %s"%pkg) - self.assertTrue(common.is_installed(pkg.name), - "Package %s must be installed"%pkg) - (rval, oparms) = self.conn.InvokeMethod( - MethodName="Install", - ObjectName=objpath) - self.assertEqual(pywbem.Uint32(0), rval, - "Installation of %s should fail"%pkg) - self.assertEqual(len(oparms), 1) - self.assertEqual(oparms['Installed'], objpath, - "Object paths should match for %s"%pkg) - - @common.mark_dangerous - def test_method_remove(self): - """ - Tests Remove method invocation. - """ - for pkg in self.pkgdb: - if not common.is_installed(pkg.name): - self.install_pkg(pkg) - objpath = self.make_op(pkg) - (rval, oparms) = self.conn.InvokeMethod( - MethodName="Remove", - ObjectName=objpath) - self.assertEqual(pywbem.Uint16(1), rval, - "Removal of package should succeed"%pkg) - self.assertEqual(0, len(oparms)) - self.assertFalse(common.is_installed(pkg.name), - "Package %s should not be installed"%pkg) - objpath["SoftwareElementState"] = pywbem.Uint16(1) - (rval, oparms) = self.conn.InvokeMethod( - MethodName="Remove", - ObjectName=objpath) - self.assertEqual(pywbem.Uint32(0), rval, - "Removal of %s should fail"%pkg) - self.assertEqual(len(oparms), 0) - -def suite(): - """For unittest loaders.""" - return unittest.TestLoader().loadTestsFromTestCase( - TestSoftwarePackage) - -if __name__ == '__main__': - unittest.main() diff --git a/src/software/test/test_system_software_collection.py b/src/software/test/test_system_software_collection.py new file mode 100755 index 0000000..193a4d5 --- /dev/null +++ b/src/software/test/test_system_software_collection.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python +# +# Copyright (C) 2012 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Michal Minar <miminar@redhat.com> +# +""" +Unit tests for LMI_SoftwareIdentity provider. +""" + +import unittest + +import base + +class TestSystemSoftwareCollection( + base.SoftwareBaseTestCase): #pylint: disable=R0904 + """ + Basic cim operations test. + """ + + CLASS_NAME = "LMI_SystemSoftwareCollection" + KEYS = ("InstanceID", ) + + def make_op(self): + """ + @param ses SoftwareElementState property value + @return object path of SoftwareIdentity + """ + objpath = self.objpath.copy() + objpath["InstanceID"] = "LMI:SystemSoftwareCollection" + return objpath + + def test_get_instance(self): + """ + Tests GetInstance call on packages from our rpm cache. + """ + objpath = self.make_op() + inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False) + + self.assertEqual(objpath, inst.path) + self.assertEqual(list(objpath.keys()), list(inst.path.keys())) + for key in self.KEYS: + self.assertEqual(inst[key], objpath[key]) + self.assertIsInstance(inst["Caption"], basestring) + + def test_enum_instance_names(self): + """ + Tests EnumInstanceNames call on installed packages. + """ + inames = self.conn.EnumerateInstanceNames(ClassName=self.CLASS_NAME) + self.assertEqual(len(inames), 1) + iname = inames[0] + self.assertEqual(iname, self.make_op()) + + def test_enum_instances(self): + """ + Tests EnumInstances call on installed packages. + """ + insts = self.conn.EnumerateInstances(ClassName=self.CLASS_NAME) + self.assertEqual(len(insts), 1) + inst = insts[0] + self.assertEqual(inst.path, self.make_op()) + self.assertEqual(inst["InstanceID"], inst.path["InstanceID"]) + self.assertIsInstance(inst["Caption"], basestring) + +def suite(): + """For unittest loaders.""" + return unittest.TestLoader().loadTestsFromTestCase( + TestSystemSoftwareCollection) + +if __name__ == '__main__': + unittest.main() diff --git a/src/software/test/util.py b/src/software/test/util.py new file mode 100644 index 0000000..da55e6a --- /dev/null +++ b/src/software/test/util.py @@ -0,0 +1,82 @@ +#!/usr/bin/python +# -*- Coding:utf-8 -*- +# +# Copyright (C) 2012 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. # +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Radek Novacek <rnovacek@redhat.com> +# Authors: Michal Minar <miminar@redhat.com> + +""" +Common test utilities. +""" + +import re +from subprocess import check_output + +RE_NEVRA = re.compile( + r'^(?P<name>.+)-(?P<evra>(?P<epoch>\d+):(?P<ver>[^-]+)' + r'-(?P<rel>.+)\.(?P<arch>[^.]+))$') +RE_NEVRA_OPT_EPOCH = re.compile( + r'^(?P<name>.+)-(?P<evra>((?P<epoch>\d+):)?(?P<ver>[^-]+)' + r'-(?P<rel>.+)\.(?P<arch>[^.]+))$') +RE_ENVRA = re.compile( + r'^(?P<epoch>\d+|\(none\)):(?P<name>.+)-(?P<ver>[^-]+)' + r'-(?P<rel>.+)\.(?P<arch>[^.]+)$') + +def make_nevra(name, epoch, ver, rel, arch, with_epoch='NOT_ZERO'): + """ + @param with_epoch may be one of: + "NOT_ZERO" - include epoch only if it's not zero + "ALWAYS" - include epoch always + "NEVER" - do not include epoch at all + """ + estr = '' + if with_epoch.lower() == "always": + estr = epoch + elif with_epoch.lower() == "not_zero": + if epoch and epoch.lower() not in {"0", "(none)"}: + estr = epoch + if len(estr): + estr += ":" + return "%s-%s%s-%s.%s" % (name, estr, ver, rel, arch) + +def make_evra(epoch, ver, rel, arch): + """ @return evra string """ + if not epoch or epoch.lower() == "(none)": + epoch = "0" + return "%s:%s-%s.%s" % (epoch, ver, rel, arch) + +def run_yum(*params, **kwargs): + """ + Runs yum with params and returns its output + It's here especially to allow pass a repolist argument, that + specifies list of repositories, to run the command on. + """ + cmd = ['yum'] + list(params) + repolist = kwargs.get('repolist', None) + if repolist is None: + repolist = [] + if repolist: + cmd += ['--disablerepo=*'] + cmd += ['--enablerepo='+r for r in repolist] + return check_output(cmd) + +def get_system_architecture(): + """ + @return the system architecture name as seen by rpm + """ + return check_output(['rpm', '-q', '--qf', '%{ARCH}\n', 'rpm']) + |