diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/software/test/README | 31 | ||||
-rw-r--r-- | src/software/test/common.py | 225 | ||||
-rw-r--r-- | src/software/test/rpmcache.py | 361 | ||||
-rwxr-xr-x | src/software/test/run_tests.py | 257 | ||||
-rwxr-xr-x | src/software/test/test_software_file_check.py | 261 | ||||
-rwxr-xr-x | src/software/test/test_software_installed_package.py | 278 | ||||
-rwxr-xr-x | src/software/test/test_software_package.py | 121 |
7 files changed, 1195 insertions, 339 deletions
diff --git a/src/software/test/README b/src/software/test/README index d60d1e8..a64382d 100644 --- a/src/software/test/README +++ b/src/software/test/README @@ -1,5 +1,26 @@ -Tests must be run as root on machine with sfcbd broker with -test dir as CWD. File common.py contains connection settings and -also test data. -This directory should contain all rpm packages, that will take part in testing -to speed it up. These packages are listed in common.py file. +Tests must be run as root on machine with sfcbd broker. +There are number of environment variables, that affect test running: + LMI_CIMOM_USERNAME + LMI_CIMOM_PASSWORD + LMI_CIMOM_BROKER + LMI_CIMOM_URL + LMI_RUN_DANGEROUS - valid values: 0, 1 + LMI_SOFTWARE_USE_CACHE - valid values: 0, 1 + - says, whether to use cache to download rpm packages, which will + be used to speed up testing + LMI_SOFTWARE_CACHE_DIR + - directory, where to store rpm packages and other information + - if not set and LMI_SOFTWARE_USE_CACHE == '1', a temporary + directory will be created and deleted, when tests are + complete + +Simple usage: + Each test module can be run as a stand-alone script or with run_tests.py + script: + $ ./run_tests.py -c --cache-dir=/var/tmp + To pass any arguments to underlying unittest.main() function, append them + after "--" swith like this: + $ ./run_tests.py -c --cache-dir=/var/tmp -- -v TestSoftwarePackage + All environment variables defined above can be overriden by command-line + arguments. + diff --git a/src/software/test/common.py b/src/software/test/common.py index f61480a..7726d3e 100644 --- a/src/software/test/common.py +++ b/src/software/test/common.py @@ -1,4 +1,4 @@ -# +# -*- encoding: utf-8 -*- # Copyright (C) 2012 Red Hat, Inc. All rights reserved. # # This library is free software; you can redistribute it and/or @@ -17,109 +17,143 @@ # # Authors: Michal Minar <miminar@redhat.com> # +""" +Common utilities and base class for all software tests. +""" -from collections import namedtuple import os import pywbem import re import subprocess +import tempfile import unittest +from subprocess import check_output + +import rpmcache -SCHEMA="http" -HOSTNAME="localhost" -PORT=5988 -USER='' -PASSWORD='' - -Package = namedtuple("Package", - "name, epoch, ver, rel, arch, " - "updatable, up_epoch, up_ver, up_rel") -# This is a database of packages used in tests. If you modify this, please -# ensure, that all corresponding rpm packages are present in test directory. -packages = [ Package(*args) for args in - ( ( "python-xlib-doc", "0", "0.15", "0.6.rc1.fc17", "noarch", False - , None, None, None) - , ( "slv2", "0", "0.6.6", "8.fc17", "x86_64", True - , "0", "0.6.6", "9.fc17") - ) ] - -pkg_files = ( - ( "bash-completion-1:2.0-1.fc17.noarch", - ( "/usr/share/bash-completion/completions/vgs" # symlink - , "/usr/share/doc/bash-completion-2.0/README" # file - , "/usr/share/doc/bash-completion-2.0" # directory - ) - , ) - , ) - -re_nevra = re.compile(r'^(?P<name>.+)-(?P<evra>(?P<epoch>\d+):(?P<ver>[^-]+)' +RE_NEVRA = re.compile(r'^(?P<name>.+)-(?P<evra>((?P<epoch>\d+):)?(?P<ver>[^-]+)' r'-(?P<rel>.+)\.(?P<arch>[^.]+))$') -def make_nevra(name, epoch, ver, rel, arch, with_epoch='NOT_ZERO'): - """ - @param with_epoch may be one of: - "NOT_ZERO" - include epoch only if it's not zero - "ALWAYS" - include epoch always - "NEVER" - do not include epoch at all - """ - estr = '' - if with_epoch.lower() == "always": - estr = epoch - elif with_epoch.lower() == "not_zero": - if epoch != "0": - estr = epoch - if len(estr): estr += ":" - return "%s-%s%s-%s.%s" % (name, estr, ver, rel, arch) - -def remove_pkg(name, *args): - subprocess.call(["rpm", "--quiet"] + list(args) + ["-e", name]) - -def install_pkg(*args, **kwargs): - if len(args) > 1 or len(kwargs) > 0: - if kwargs.has_key('nevra'): - m = re_nevra.match(kwargs['nevra']) - args = [ m.group(k) for k in ( - 'name', 'epoch', 'ver', 'rel', 'arch') ] - nevra = make_nevra(*args, with_epoch="NEVER") - rpm_name = nevra + ".rpm" +def remove_pkg(pkg, *args): + """ + Remove package with rpm command. + @param pkg is either instance of Package or package name + @param args is a list of parameters for rpm command + """ + if isinstance(pkg, rpmcache.Package): + pkg = pkg.name + subprocess.call(["rpm", "--quiet"] + list(args) + ["-e", pkg]) + +def install_pkg(pkg, newer=True): + """ + Install a specific package. + @param pkg is either package name or instance of Package + In latter case, a specific version is installed. + """ + if isinstance(pkg, rpmcache.Package): + rpm_name = pkg.get_nevra(newer) + ".rpm" if os.path.exists(rpm_name): subprocess.call(["rpm", "--quiet", "-i", rpm_name]) return - subprocess.call(["yum", "-q", "-y", "install", args[0]]) + pkg = pkg.name + subprocess.call(["yum", "-q", "-y", "install", pkg]) -def is_installed(*args, **kwargs): - if len(args) == 1: - return subprocess.call(["rpm", "--quiet", "-q", args[0]]) == 0 +def is_installed(pkg, newer=True): + """ + Check, whether package is installed. + Accepts the same parameters as install_pkg. + @see install_pkg + """ + if not isinstance(pkg, rpmcache.Package): + return subprocess.call(["rpm", "--quiet", "-q", pkg]) == 0 else: - if kwargs.has_key("nevra"): - nevra = kwargs["nevra"] - name = re_nevra.match(nevra).group('name') - else: - nevra = make_nevra(*args) - name = args[0] try: out = subprocess.check_output( - ["rpm", "-q", "--qf", "%{NEVRA}", name]) - return out == nevra + ["rpm", "-q", "--qf", "%{NEVRA}", pkg.name]) + return out == pkg.get_nevra(newer) except subprocess.CalledProcessError: return False def verify_pkg(name): + """ + @return output of command rpm, with verification output for package + """ return subprocess.call(["rpm", "--quiet", "-Va", name]) == 0 -class SoftwareBaseTestCase(unittest.TestCase): +def mark_dangerous(method): + """ + Decorator for methods of unittest.TestCase subclasses, that + skips dangerous tests, if environment variable does not allow them. + """ + if os.environ.get('LMI_RUN_DANGEROUS', '0') == '1': + return method + else: + return unittest.skip("This test is marked as dangerous.")(method) + +def is_config_file(pkg, file_path): + """ + @return True, if file_path is a configuration file of package pkg. + """ + out = check_output(['rpm', '-qc', pkg.name]) + return file_path in set(out.splitlines()) + +def is_doc_file(pkg, file_path): + """ + @return True, if file_path is a documentation file of package pkg. + """ + out = check_output(['rpm', '-qd', pkg.name]) + return file_path in set(out.splitlines()) + +def get_pkg_files(pkg): + """ + Tries to make a the heterogenous and smallest set of test files from + package. + @param pkg must be installed package + @return list of few files installed by pkg + """ + output = check_output(['rpm', '-ql', pkg.name]) + configs = set() + docs = set() + dirs = set() + files = set() + symlinks = set() + for fpath in output.splitlines(): + if ( len(dirs) == 0 + and not os.path.islink(fpath) + and os.path.isdir(fpath)): + dirs.add(fpath) + elif len(symlinks) == 0 and os.path.islink(fpath): + symlinks.add(fpath) + elif not os.path.islink(fpath) and os.path.isfile(fpath): + if len(configs) == 0 and is_config_file(pkg, fpath): + configs.add(fpath) + elif len(docs) == 0 and is_doc_file(pkg, fpath): + docs.add(fpath) + elif len(files) == 0: + files.add(fpath) + out = list(configs) + list(docs) + list(dirs) + list(symlinks) + if len(files) > 0 and len(docs) == 0 and len(symlinks) == 0: + out += list(files) + return out + +class SoftwareBaseTestCase(unittest.TestCase): #pylint: disable=R0904 + """ + Base class for all LMI Software test classes. + """ + + CLASS_NAME = "Define in subclass" def setUp(self): unittest.TestCase.setUp(self) - self.url = "%s://%s:%d" % (SCHEMA, HOSTNAME, PORT) - self.conn = pywbem.WBEMConnection(self.url, (USER, PASSWORD)) - self.op = pywbem.CIMInstanceName( + self.objpath = pywbem.CIMInstanceName( namespace="root/cimv2", classname=self.CLASS_NAME) - def tearDown(self): - del self.conn - - def assertIsSubclass(self, cls, base_cls): + def assertIsSubclass(self, cls, base_cls): #pylint: disable=C0103 + """ + Checks, whether cls is subclass of base_cls from CIM perspective. + @param cls name of subclass + @param base_cls name of base class + """ if not isinstance(cls, basestring): raise TypeError("cls must be a string") if not isinstance(base_cls, basestring): @@ -127,3 +161,40 @@ class SoftwareBaseTestCase(unittest.TestCase): return self.assertTrue(pywbem.is_subclass(self.conn, "root/cimv2", base_cls, cls)) + @classmethod + def setUpClass(cls): + cls.url = os.environ.get('LMI_CIMOM_URL', 'http://localhost:5988') + cls.user = os.environ.get('LMI_CIMOM_USERNAME', '') + cls.password = os.environ.get('LMI_CIMOM_PASSWORD', '') + cls.broker = os.environ.get('LMI_CIMOM_BROKER', None) + cls.conn = pywbem.WBEMConnection(cls.url, (cls.user, cls.password)) + cls.run_dangerous = ( + os.environ.get('LMI_RUN_DANGEROUS', '0') == '1') + use_cache = os.environ.get('LMI_SOFTWARE_USE_CACHE', '0') == '1' + cls.cache_dir = None + if use_cache: + cls.cache_dir = os.environ.get('LMI_SOFTWARE_CACHE_DIR', None) + if cls.cache_dir is None: + cls.cache_dir = tempfile.mkdtemp(suffix="software_database") + if cls.cache_dir: + cls.prev_dir = os.getcwd() + if not os.path.exists(cls.cache_dir): + os.makedirs(cls.cache_dir) + # rpm packages are expected to be in CWD + os.chdir(cls.cache_dir) + cls.pkgdb = rpmcache.get_pkg_database(use_cache=use_cache) + for pkg in cls.pkgdb: + if not is_installed(pkg.name): + install_pkg(pkg) + cls.pkg_files = dict((pkg.name, get_pkg_files(pkg)) + for pkg in cls.pkgdb) + + @classmethod + def tearDownClass(cls): + if cls.run_dangerous: + for pkg in cls.pkgdb: + if is_installed(pkg.name): + remove_pkg(pkg.name) + if hasattr(cls, "prev_dir"): + os.chdir(cls.prev_dir) + diff --git a/src/software/test/rpmcache.py b/src/software/test/rpmcache.py new file mode 100644 index 0000000..1e3a1ed --- /dev/null +++ b/src/software/test/rpmcache.py @@ -0,0 +1,361 @@ +#!/usr/bin/python +# -*- Coding:utf-8 -*- +# +# Copyright (C) 2012 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. # +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Radek Novacek <rnovacek@redhat.com> +# Authors: Michal Minar <miminar@redhat.com> +""" +Creation and manipulation utilities with rpm cache for software tests. +""" +import copy +import datetime +import os +import pickle +import re +from collections import defaultdict +from subprocess import call, check_output + +def make_nevra(name, epoch, ver, rel, arch, with_epoch='NOT_ZERO'): + """ + @param with_epoch may be one of: + "NOT_ZERO" - include epoch only if it's not zero + "ALWAYS" - include epoch always + "NEVER" - do not include epoch at all + """ + estr = '' + if with_epoch.lower() == "always": + estr = epoch + elif with_epoch.lower() == "not_zero": + if epoch != "0": + estr = epoch + if len(estr): + estr += ":" + return "%s-%s%s-%s.%s" % (name, estr, ver, rel, arch) + +class Package(object): #pylint: disable=R0902 + """ + Element of test package database. It's a container for package + informations. It contains two sets of versions for single package. + That's meant for updating tests. + """ + def __init__(self, name, epoch, ver, rel, arch, repo, + up_epoch, up_ver, up_rel, up_repo): + """ + Arguments prefixed with 'up_' are for newer package. + """ + self._name = name + self._epoch = epoch + self._ver = ver + self._rel = rel + self._arch = arch + self._repo = repo + self._up_epoch = up_epoch + self._up_ver = up_ver + self._up_rel = up_rel + self._up_repo = up_repo + + @property + def name(self): return self._name #pylint: disable=C0111,C0321 + @property + def epoch(self): return self._epoch #pylint: disable=C0111,C0321 + @property + def ver(self): return self._ver #pylint: disable=C0111,C0321 + @property + def rel(self): return self._rel #pylint: disable=C0111,C0321 + @property + def arch(self): return self._arch #pylint: disable=C0111,C0321 + @property + def repo(self): return self._repo #pylint: disable=C0111,C0321 + @property + def nevra(self): #pylint: disable=C0111,C0321 + return self.get_nevra(True) + + @property + def up_epoch(self): return self._up_epoch #pylint: disable=C0111,C0321 + @property + def up_ver(self): return self._up_ver #pylint: disable=C0111,C0321 + @property + def up_rel(self): return self._up_rel #pylint: disable=C0111,C0321 + @property + def up_repo(self): return self._up_repo #pylint: disable=C0111,C0321 + @property + def up_nevra(self): #pylint: disable=C0111,C0321 + return self.get_nevra(True) + + def get_nevra(self, newer=True, with_epoch='NOT_ZERO'): + """ + @newer if True, evr part is made from properties prefixed with 'up_' + @return pkg nevra string + """ + if newer: + attrs = ['name', 'up_epoch', 'up_ver', 'up_rel', 'arch'] + else: + attrs = ['name', 'epoch', 'ver', 'rel', 'arch'] + return make_nevra(*[getattr(self, '_'+a) for a in attrs], + with_epoch=with_epoch) + +DB_BACKUP_FILE = 'lmi_software_test_cache' + +RE_AVAIL_PKG = re.compile( + r'^(?P<name>[^\s]+)\.(?P<arch>[a-zA-Z0-9_]+)' + r'\s+(?P<epoch>([0-9]+:)?)(?P<version>[a-zA-Z0-9._+-]+)' + r'-(?P<release>[a-zA-Z0-9_.]+)\s+' + r'(?P<repository>[a-zA-Z0-9_-]+)\s*$', re.MULTILINE) +# this won't match the last entry, unless "package\n" is not appended +# at the end of the string +RE_PKG_DEPS = re.compile( + r'^package:\s*(?P<name>[^\s]+)\.(?P<arch>[a-zA-Z0-9_]+)' + r'\s+(?P<epoch>([0-9]+:)?)(?P<version>[a-zA-Z0-9._+-]+)' + r'-(?P<release>[a-zA-Z0-9_.]+)\s+(?P<dep_list>.*?)' + r'(?=^package|\Z)', re.MULTILINE | re.DOTALL) +RE_DEPS_PROVIDERS = re.compile( + r'^\s+provider:\s*(?P<name>[^\s]+)\.(?P<arch>[a-zA-Z0-9_]+)' + r'\s+(?P<epoch>([0-9]+:)?)(?P<version>[a-zA-Z0-9._+-]+)' + r'-(?P<release>[a-zA-Z0-9_.]+)\s*$', re.IGNORECASE | re.MULTILINE) +RE_PKG_INFO = re.compile( + r'^Name\s*:\s*(?P<name>[^\s]+).*?' + r'^(Epoch\s*:\s*(?P<epoch>[0-9]+)\s+)?' + r'^Version\s*:\s*(?P<version>[a-zA-Z0-9._+-]+)\s+' + r'^Release\s*:\s*(?P<release>[^\s]+)\s+.*?' + r'^Size\s*:\s*(?P<size>\d+(\.\d+)?)( *(?P<units>[kMG]))?', + re.MULTILINE | re.DOTALL | re.IGNORECASE) +RE_REPO = re.compile( + r'^[^\s]+\b(?!\s+id)', re.MULTILINE | re.IGNORECASE) + +# maximum number of packages, that will be selected for testing +MAX_PKG_DB_SIZE = 3 +# step used to iterate over package names used to check for thery dependencies +# it's a number of packages, that will be passed to yum command at once +PKG_DEPS_ITER_STEP = 50 + +def _match_nevr(match): + """ + @param match is a regexp match object with parsed rpm package + @return tuple (name, epoch, version, release) + """ + return ( match.group('name') + , match.group('epoch') + , match.group('version') + , match.group('release')) + +def _filter_duplicates(installed, avail_str): + """ + Parse output of "yum list available" command and retuns only those + packages occuring in multiple versions. + @param installed is a set of installed package names + @param avail_str yum command output + @return [ [pkg1v1, pkg1v2, ...], [pkg2v1, pkg2v2, ...], ... ] + Each sublist of result contain at least 2 elements, that are instances + of Package. + """ + m2pkg = lambda m: Package(m.group('name'), + m.group('epoch') if m.group('epoch') else '0', + m.group('version'), m.group('release'), + m.group('arch'), m.group('repository'), + None, None, None, None) + dups_list = [] + cur_package_matches = [] + prev_match = None + system_arch = get_system_architecture() + for match in RE_AVAIL_PKG.finditer(avail_str): + if ( _match_nevr(match) in [ _match_nevr(m) + for m in cur_package_matches] + or ( ( not prev_match + or prev_match.group('name') in + [m.group('name') for m in cur_package_matches]) + and match.group('arch') not in ('noarch', system_arch))): + continue + if prev_match and prev_match.group('name') != match.group('name'): + if ( len(cur_package_matches) > 1 + and not match.group('name') in installed): + pkgs = [ m2pkg(m) for m in cur_package_matches ] + dups_list.append(pkgs) + cur_package_matches = [] + cur_package_matches.append(match) + prev_match = match + if len(cur_package_matches) > 1: + dups_list.append([ m2pkg(m) for m in cur_package_matches ]) + return dups_list + +def _check_pkg_dependencies( + installed, + dup_list, + number_of_packages=MAX_PKG_DB_SIZE): + """ + Finds packages from dup_list with satisfied (installed) dependencies. + @param installed is a set of installed package names + @return filtered dup_list with at least number_of_packages elements. + """ + cmd = ['yum', 'deplist'] + dups_no_deps = [] + for i in range(0, len(dup_list), PKG_DEPS_ITER_STEP): + dups_part = dup_list[i:i+PKG_DEPS_ITER_STEP] + for dups in dups_part: + cmd.append(dups[0].name) + deplist_str = check_output(cmd) + for pkgs, match_pkg in zip(dups_part, + RE_PKG_DEPS.finditer(deplist_str)): + for match_dep in RE_DEPS_PROVIDERS.finditer( + match_pkg.group('dep_list')): + if match_dep.group('name') not in installed: + break + else: + dups_no_deps.append(pkgs) + if len(dups_no_deps) >= number_of_packages: + break + return dups_no_deps + +def _sorted_db_by_size(pkgdb): + """ + @param pkgdb is a list of lists of packages with common name + @return sorted instances of Package according to their size + """ + cmd = ['yum', 'info', '--showduplicates'] + cmd.extend([ps[0].name for ps in pkgdb]) + info_str = check_output(cmd) + pkg_sizes = {} + # to get correct ordering from "yum info" command + # { pkg_name : [(epoch, version, release), ... ] } + pkg_version_order = defaultdict(list) + for info_match in RE_PKG_INFO.finditer(info_str): + pkg_name = info_match.group('name') + size = float(info_match.group('size')) + units = info_match.group('units').lower() + if units: + size *= defaultdict(lambda: 1, + {'k':10**3, 'm':10**6, 'g':10**9})[units] + pkg_sizes[pkg_name] = size + epoch = info_match.group('epoch') + if not epoch: + epoch = "0" + pkg_version_order[pkg_name].append(( + epoch, info_match.group('version'), info_match.group('release'))) + pkgdb = sorted(pkgdb, key=lambda pkgs: pkg_sizes[pkgs[0].name])[ + :MAX_PKG_DB_SIZE] + + for i, pkgs in enumerate(pkgdb): + pkgs = sorted(pkgs, key=lambda p: + pkg_version_order[pkgs[0].name].index((p.epoch, p.ver, p.rel))) + pkg_kwargs = dict((k, getattr(pkgs[0], k)) for k in ('name', 'arch') ) + for attr in ('epoch', 'ver', 'rel', 'repo'): + pkg_kwargs[attr] = getattr(pkgs[0], attr) + pkg_kwargs['up_'+attr] = getattr(pkgs[-1], attr) + pkgdb[i] = Package(**pkg_kwargs) + return pkgdb + +def _get_repo_list(): + """ + @return list of software repository names + """ + repos_str = check_output(['yum', 'repolist', '-q']) + return RE_REPO.findall(repos_str) + +def _download_pkgdb(repolist, pkgdb, cache_dir=None): + """ + Downloads all rpm packages (old and newer versions) from package database + to current directory. + """ + repo_pkgs = defaultdict(list) + for pkg in pkgdb: + repo_pkgs[pkg.repo].append(pkg.name) + repo_pkgs[pkg.up_repo].append(pkg.name) + base_cmd = ['yumdownloader'] + if cache_dir: + base_cmd.extend(['--destdir', cache_dir]) + for repo, pkgs in repo_pkgs.items(): + cmd = copy.copy(base_cmd) + repos = set(repolist) + repos.remove(repo) + for not_allowed_repo in repos: + cmd.append('--disablerepo='+not_allowed_repo) + cmd.append('--enablerepo='+repo) + cmd.extend(pkgs) + call(cmd) + +def _make_rpm_path(pkg, cache_dir='', newer=True): + """ + @param newer says, whether to use EVR of package to update + (in this case, all epoch/ver/rel attributes will be prefixed with "up_") + @return path to rpm package made from instance of Package + """ + if not isinstance(pkg, Package): + raise TypeError("pkg must be an instance of Package ") + return os.path.join(cache_dir, pkg.get_nevra(newer)) + '.rpm' + +def get_system_architecture(): + """ + @return the system architecture name as seen by rpm + """ + return check_output(['rpm', '-q', '--qf', '%{ARCH}\n', 'rpm']) + +def write_pkgdb(pkgdb, cache_dir=''): + """ + Writes package database into a file named DB_BACKUP_FILE. + """ + with open(os.path.join(cache_dir, DB_BACKUP_FILE), 'w') as db_file: + pickle.dump((datetime.datetime.now(), pkgdb), db_file) + +def load_pkgdb(cache_dir=''): + """ + This is inverse function to _write_pkgdb(). + @return package database loaded from file + """ + with open(os.path.join(cache_dir, DB_BACKUP_FILE), 'r') as db_file: + date_time, pkgdb = pickle.load(db_file) + #print "Loaded package database from: %s" % date_time + return pkgdb + +def get_pkg_database(force_update=False, use_cache=True, cache_dir=''): + """ + Checks yum database for available packages, that have at least two + different versions in repositories. Only not installed ones with + all of their dependencies intalled are selected. + And from those, few of the smallest are downloaded as rpms. + @return list of instances of Package of selected packages + """ + if ( use_cache and not force_update + and os.path.exists(os.path.join(cache_dir, DB_BACKUP_FILE))): + pkgdb = load_pkgdb(cache_dir) + valid_db = True + for pkg in pkgdb: + if ( not os.path.exists(_make_rpm_path(pkg, cache_dir)) + or not os.path.exists(_make_rpm_path(pkg, cache_dir, True))): + valid_db = False + #print "Old package database is not valid" + break + if valid_db: + return pkgdb + #print "Getting installed packages" + installed = set(check_output( #pylint: disable=E1103 + ['rpm', '-qa', '--qf=%{NAME}\n']).splitlines()) + #print "Getting all available packages" + avail_str = check_output(['yum', 'list', 'available', '--showduplicates']) + # list of lists of packages with the same name, longer than 2 + #print "Finding duplicates" + dups_list = _filter_duplicates(installed, avail_str) + #print "Selecting only those (from %d) with installed dependencies" % \ + #len(dups_list) + selected = _check_pkg_dependencies(installed, dups_list) + #print "Selecting the smallest ones" + pkgdb = _sorted_db_by_size(selected) + if use_cache: + repolist = _get_repo_list() + _download_pkgdb(repolist, pkgdb, cache_dir) + #print "Backing up database information" + write_pkgdb(pkgdb, cache_dir) + return pkgdb + diff --git a/src/software/test/run_tests.py b/src/software/test/run_tests.py new file mode 100755 index 0000000..9d290a1 --- /dev/null +++ b/src/software/test/run_tests.py @@ -0,0 +1,257 @@ +#!/usr/bin/python +# -*- Coding:utf-8 -*- +# +# Copyright (C) 2012 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. # +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Radek Novacek <rnovacek@redhat.com> +# Authors: Michal Minar <miminar@redhat.com> +""" +Runs all defined tests. + +Test modules are expected to be in directory with this script and +should contain subclasses of unittest.TestCase, that will be loaded. +Preferably a suite() function should be defined there as well. +They must be named according to shell regexp: "test_*.py". +""" + +import argparse +import copy +import getpass +import inspect +import pywbem +import os +import shutil +import sys +import tempfile +import unittest + +import rpmcache + +# this is a global variable, that can be modified by environment or program +# argument +CACHE_DIR = '' + +def parse_cmd_line(): + """ + Use ArgumentParser to parse command line arguments. + @return (parsed arguments object, arguments for unittest.main()) + """ + parser = argparse.ArgumentParser( + add_help=False, # handle help message ourselves + description="Test OpenLMI Software providers. Arguments" + " for unittest main function can be added after \"--\"" + " switch.") + parser.add_argument("--url", + default=os.environ.get("LMI_CIMOM_URL", "http://localhost:5988"), + help="Network port to use for tests") + parser.add_argument("-u", "--user", + default=os.environ.get("LMI_CIMOM_USERNAME", ''), + help="User for broker authentication.") + parser.add_argument("-p", "--password", + default=os.environ.get("LMI_CIMOM_PASSWORD", ''), + help="User's password.") + dangerous_group = parser.add_mutually_exclusive_group() + dangerous_group.add_argument("--run-dangerous", action="store_true", + default=(os.environ.get('LMI_RUN_DANGEROUS', '0') == '1'), + help="Run also tests dangerous for this machine" + " (tests, that manipulate with software database)." + " Overrides environment variable LMI_RUN_DANGEROUS.") + dangerous_group.add_argument('--no-dangerous', action="store_false", + dest="run_dangerous", + default=os.environ.get('LMI_RUN_DANGEROUS', '0') == '1', + help="Disable dangerous tests.") + cache_group = parser.add_mutually_exclusive_group() + cache_group.add_argument("-c", "--use-cache", action="store_true", + default=(os.environ.get('LMI_SOFTWARE_USE_CACHE', '0') == '1'), + help="Use a cache directory to download rpm packages for" + " testing purposes. It greatly speeds up testing." + " Also a database file \"${LMI_SOFTWARE_CACHE_DIR}/%s\"" + " will be created to store information" + " for this system. Overrides environment variable" + " LMI_SOFTWARE_USE_CACHE." % rpmcache.DB_BACKUP_FILE) + cache_group.add_argument("--no-cache", action="store_false", + dest="use_cache", + default=(os.environ.get('LMI_SOFTWARE_USE_CACHE', '0') == '1'), + help="Do not cache rpm packages for speeding up tests." + " Overrides environment variable LMI_SOFTWARE_USE_CACHE.") + parser.add_argument('--cache-dir', + default=os.environ.get('LMI_SOFTWARE_CACHE_DIR', ''), + help="Use particular directory, instead of temporary one, to store" + " rpm packages for testing purposes. Overrides nevironment" + " variable LMI_SOFTWARE_CACHE_DIR.") + parser.add_argument('--force-update', action="store_true", + help="Force update of package database. Otherwise an old" + " one will be used (if any exists).") + parser.add_argument('-l', '--list-tests', action="store_true", + help="List all possible test names.") + parser.add_argument('-h', '--help', action="store_true", + help="Show help message.") + + argv = copy.copy(sys.argv) + rest = [] + if '--' in argv: + index = argv.index('--') + argv = argv[:index] + rest = sys.argv[index + 1:] + args, unknown_args = parser.parse_known_args(argv) + if args.help: + parser.print_help() + print + print "*"*79 + print " Unit test options (parsed after \"--\" switch)." + print "*"*79 + print + unittest.main(argv=[argv[0], "--help"]) + return (args, unknown_args + rest) + +def try_connection(args): + """ + Try to connect to cim broker. If authentication fails, ask + the user for credentials in loop. + @return (user, password) + """ + user = args.user + password = args.password + while False: + try: + pywbem.WBEMConnection(args.url, (user, password)) + break + except pywbem.cim_http.AuthError as exc: + user = args.user + sys.stderr.write("Authentication error\n") + if exc.args[0] == pywbem.CIM_ERR_ACCESS_DENIED: + if not user: + user = raw_input("User: ") + password = getpass.getpass() + +def prepare_environment(args): + """ + Set the environment for test scripts. + """ + os.environ['LMI_CIMOM_URL'] = args.url + os.environ['LMI_CIMOM_USERNAME'] = args.user + os.environ['LMI_CIMOM_PASSWORD'] = args.password + os.environ['LMI_RUN_DANGEROUS'] = ( + '1' if args.run_dangerous else '0') + os.environ['LMI_SOFTWARE_USE_CACHE'] = '1' if args.use_cache else '0' + if args.use_cache: + os.environ['LMI_SOFTWARE_CACHE_DIR'] = CACHE_DIR + +def load_tests(loader, standard_tests, pattern): + """ + Helper function for unittest.main() test loader. + @return TestSuite instance + """ + this_dir = os.path.dirname(__file__) + if standard_tests: + suite = standard_tests + else: + suite = unittest.TestSuite() + discover_kwargs = dict(start_dir=this_dir) + if pattern is not None: + discover_kwargs['pattern'] = pattern + tests = loader.discover(**discover_kwargs) + suite.addTests(tests) + return suite + +class LMITestLoader(unittest.TestLoader): + """ + Customized test loader to make invoking particular tests a lot easier. + """ + + @staticmethod + def find_in_test_nodes(node, name): + """ + Traverses suite tree nodes to find a test named name. + @param name is a name of test to find + @return desired TestCase or test function + """ + for subnode in node: + if isinstance(subnode, unittest.TestSuite): + subnode = LMITestLoader.find_in_test_nodes(subnode, name) + if subnode is not None: + return subnode + elif isinstance(subnode, unittest.TestCase): + if name == subnode.__class__.__name__: + return subnode + elif inspect.isfunction(subnode) and name == subnode.__name__: + return subnode + + def loadTestsFromName(self, name, module=None): #pylint: disable=C0103 + """ + Override of parent method to make test cases accessible to by their + names from command line. + """ + suite = load_tests(self, None, None) + parts = name.split('.') + node = suite + for part in parts: + node = LMITestLoader.find_in_test_nodes(node, part) + if node is None: + # name not found + return unittest.TestLoader.loadTestsFromName(self, name, module) + return node + +def unwind_test_suite_tree(node): + """ + Make a list of test names out of TestSuite. + @param node is a suite + @return [ test_name, ... ] + """ + result = [] + for subnode in node: + if isinstance(subnode, unittest.TestSuite): + result.extend(unwind_test_suite_tree(subnode)) + elif isinstance(subnode, unittest.TestCase): + result.append( subnode.__class__.__name__ + + '.'+subnode._testMethodName) + elif inspect.isfunction(subnode): + result.append(subnode.__name__) + return result + +def list_tests(): + """ + Prints all available tests to stdout and exits. + """ + suite = load_tests(LMITestLoader(), None, None) + test_names = unwind_test_suite_tree(suite) + print "\n".join(test_names) + sys.exit(0) + +def main(): + """ + Main functionality of script. + """ + global CACHE_DIR + args, ut_args = parse_cmd_line() + if args.list_tests: + list_tests() + if args.use_cache and not args.cache_dir: + CACHE_DIR = tempfile.mkdtemp(suffix="software_database") + elif args.use_cache: + CACHE_DIR = args.cache_dir + try_connection(args) + rpmcache.get_pkg_database(args.force_update, args.use_cache, CACHE_DIR) + prepare_environment(args) + test_program = unittest.main(argv=ut_args, + testLoader=LMITestLoader(), exit=False) + if args.use_cache and not args.cache_dir: + shutil.rmtree(CACHE_DIR) + sys.exit(test_program.result.wasSuccessful()) + +if __name__ == '__main__': + main() + diff --git a/src/software/test/test_software_file_check.py b/src/software/test/test_software_file_check.py index 0e91bfa..cb09282 100755 --- a/src/software/test/test_software_file_check.py +++ b/src/software/test/test_software_file_check.py @@ -18,31 +18,42 @@ # # Authors: Michal Minar <miminar@redhat.com> # +""" +Unit tests for LMI_SoftwareFileCheck provider. +""" -from common import * import hashlib -import shutil -import socket -import stat +import os +import pywbem +import re +import subprocess +import unittest +from collections import namedtuple -re_checksum = re.compile(r'^([0-9a-fA-F]+)\s+.*') +import common -PassedFlags = namedtuple("PassedFlags", +RE_CHECKSUM = re.compile(r'^([0-9a-fA-F]+)\s+.*') + +PassedFlags = namedtuple("PassedFlags", #pylint: disable=C0103 "exists, type, size, mode, checksum, dev, ltarget, uid, gid, mtime") -class TestSoftwareInstalledPackage(SoftwareBaseTestCase): +class TestSoftwareFileCheck(common.SoftwareBaseTestCase): + #pylint: disable=R0904 + """ + Basic cim operations test. + """ CLASS_NAME = "LMI_SoftwareFileCheck" KEYS = ( "CheckID", "Name", "SoftwareElementID", "SoftwareElementState" , "TargetOperatingSystem", "Version") hash_num2algo = { - 1 : hashlib.md5, - 2 : hashlib.sha1, - 8 : hashlib.sha256, - 9 : hashlib.sha384, - 10 : hashlib.sha512, - 11 : hashlib.sha224 + 1 : hashlib.md5, #pylint: disable=E1101 + 2 : hashlib.sha1, #pylint: disable=E1101 + 8 : hashlib.sha256, #pylint: disable=E1101 + 9 : hashlib.sha384, #pylint: disable=E1101 + 10 : hashlib.sha512, #pylint: disable=E1101 + 11 : hashlib.sha224 #pylint: disable=E1101 } hash_num2cmd = { @@ -63,25 +74,74 @@ class TestSoftwareInstalledPackage(SoftwareBaseTestCase): 11 : 56 } - def make_op(self, nevra, filename): - op = self.op.copy() - m = re_nevra.match(nevra) - name, version = [ m.group(k) for k in ("name", "ver") ] - op["Name"] = filename - op["Version"] = version - op["CheckID"] = "%s#%s" % (name, filename) - op["SoftwareElementState"] = pywbem.Uint16(2) - op["TargetOperatingSystem"] = pywbem.Uint16(36) - op["SoftwareElementID"] = nevra - return op + def make_op(self, pkg, filename, newer=True): + """ + @return object path of LMI_SoftwareFileCheck + """ + objpath = self.objpath.copy() + objpath["Name"] = filename + objpath["Version"] = getattr(pkg, "up_ver" if newer else "ver") + objpath["CheckID"] = "%s#%s" % (pkg.name, filename) + objpath["SoftwareElementState"] = pywbem.Uint16(2) + objpath["TargetOperatingSystem"] = pywbem.Uint16(36) + objpath["SoftwareElementID"] = pkg.get_nevra(newer) + return objpath + + def assertEqualSEID(self, id1, id2): + """ + This is for comparison of SoftwareElementID property values. + """ + if ( not isinstance(id1, basestring) + or not isinstance(id2, basestring)): + return common.SoftwareBaseTestCase.assertEqual(self, id1, id2) + match1 = common.RE_NEVRA.match(id1) + match2 = common.RE_NEVRA.match(id2) + if not match1 or not match2: + return common.SoftwareBaseTestCase.assertEqual(self, id1, id2) + if any( match1.group(g) != match2.group(g) + for g in ('name', 'ver', 'rel', 'arch')): + return common.SoftwareBaseTestCase.assertEqual(self, id1, id2) + epoch1 = match1.group('epoch') + epoch2 = match2.group('epoch') + if not epoch1: + epoch1 = '0' + if not epoch2: + epoch2 = '0' + if epoch1 != epoch2: + return common.SoftwareBaseTestCase.assertEqual(self, id1, id2) + return True + + def assertEqual(self, op1, op2): + """ + This is override for object paths, that allows some differences + (like missing epoch in SoftwareElementID). + """ + if ( not isinstance(op1, pywbem.CIMInstanceName) + or not isinstance(op2, pywbem.CIMInstanceName) + or op1.classname != op2.classname + or op1.namespace != op2.namespace + or op1.keybindings.keys() != op2.keybindings.keys() + or any(op1[a] != op2[a] for a in + ( 'Name', 'Version', 'CheckID', 'SoftwareElementState' + , 'TargetOperatingSystem'))): + return common.SoftwareBaseTestCase.assertEqual(self, op1, op2) + return self.assertEqualSEID( + op1['SoftwareElementID'], op2['SoftwareElementID']) + return True def make_checksum_str(self, csumnum, filename): - return re_checksum.match(subprocess.check_output([ + """ + @return checksum of installed file + """ + return RE_CHECKSUM.match(subprocess.check_output([ self.hash_num2cmd[csumnum], filename])).group(1).lower() - def do_test_symlink(self, op, nevra, f, inst): - target = os.readlink(f) - stats = os.lstat(f) + def do_test_symlink(self, filepath, inst): + """ + Assert some details about symlink. + """ + target = os.readlink(filepath) + stats = os.lstat(filepath) self.assertEqual(inst["FileType"], pywbem.Uint16(3)) self.assertEqual(inst["FileUserID"], stats.st_uid) @@ -96,9 +156,9 @@ class TestSoftwareInstalledPackage(SoftwareBaseTestCase): # modify owner prev_user = inst["FileUserID"] - prev_mtime = inst["LastModificationTime"] - prev_pflags = PassedFlags(*inst["PassedFlags"]) - os.lchown(f, stats.st_uid + 1, -1) + #prev_mtime = inst["LastModificationTime"] + #prev_pflags = PassedFlags(*inst["PassedFlags"]) + os.lchown(filepath, stats.st_uid + 1, -1) inst = self.conn.GetInstance(InstanceName=inst.path) self.assertEqual(inst["FileUserID"], inst["ExpectedFileUserID"] + 1) self.assertEqual(inst["FileUserID"], prev_user + 1) @@ -118,9 +178,10 @@ class TestSoftwareInstalledPackage(SoftwareBaseTestCase): self.assertTrue(cur_pflags.mtime) # modify link_target - os.remove(f) - os.symlink("wrong" + "*"*len(inst["ExpectedLinkTarget"]), f) - os.lchown(f, inst["ExpectedFileUserID"], inst["ExpectedFileGroupID"]) + os.remove(filepath) + os.symlink("wrong" + "*"*len(inst["ExpectedLinkTarget"]), filepath) + os.lchown(filepath, inst["ExpectedFileUserID"], + inst["ExpectedFileGroupID"]) inst = self.conn.GetInstance(InstanceName=inst.path) cur_pflags = PassedFlags(*inst["PassedFlags"]) @@ -138,8 +199,11 @@ class TestSoftwareInstalledPackage(SoftwareBaseTestCase): self.assertTrue(cur_pflags.gid) self.assertTrue(cur_pflags.mtime) - def do_test_directory(self, op, nevra, f, inst): - stats = os.lstat(f) + def do_test_directory(self, filepath, inst): + """ + Assert some details about directory. + """ + stats = os.lstat(filepath) self.assertEqual(inst["FileType"], pywbem.Uint16(2)) self.assertEqual(inst["FileUserID"], stats.st_uid) @@ -152,8 +216,11 @@ class TestSoftwareInstalledPackage(SoftwareBaseTestCase): self.assertEqual(inst["LastModificationTime"], int(stats.st_mtime)) - def do_test_file(self, op, nevra, f, inst): - stats = os.lstat(f) + def do_test_file(self, filepath, inst): + """ + Assert some details about regurar file. + """ + stats = os.lstat(filepath) self.assertEqual(inst["FileType"], pywbem.Uint16(1)) self.assertEqual(inst["FileUserID"], stats.st_uid) @@ -161,7 +228,7 @@ class TestSoftwareInstalledPackage(SoftwareBaseTestCase): self.assertEqual(inst["FileMode"], stats.st_mode) self.assertEqual(inst["FileSize"], stats.st_size) self.assertIs(inst["LinkTarget"], None) - csum = self.make_checksum_str(inst['FileChecksumType'], f) + csum = self.make_checksum_str(inst['FileChecksumType'], filepath) self.assertEqual(inst["FileChecksum"].lower(), csum) self.assertEqual(inst["ExpectedLastModificationTime"], inst["LastModificationTime"]) @@ -169,7 +236,7 @@ class TestSoftwareInstalledPackage(SoftwareBaseTestCase): int(stats.st_mtime)) # make it longer - with open(f, "a+") as fobj: + with open(filepath, "a+") as fobj: fobj.write("data\n") inst = self.conn.GetInstance(InstanceName=inst.path) cur_pflags = PassedFlags(*inst["PassedFlags"]) @@ -189,9 +256,10 @@ class TestSoftwareInstalledPackage(SoftwareBaseTestCase): self.assertFalse(cur_pflags.mtime) # change file type - os.remove(f) - os.symlink(f, f) - os.lchown(f, inst["ExpectedFileUserID"], inst["ExpectedFileGroupID"]) + os.remove(filepath) + os.symlink(filepath, filepath) + os.lchown(filepath, inst["ExpectedFileUserID"], + inst["ExpectedFileGroupID"]) inst = self.conn.GetInstance(InstanceName=inst.path) cur_pflags = PassedFlags(*inst["PassedFlags"]) self.assertNotEqual(inst["LinkTarget"], inst["ExpectedLinkTarget"]) @@ -213,7 +281,7 @@ class TestSoftwareInstalledPackage(SoftwareBaseTestCase): self.assertFalse(cur_pflags.mtime) # remove it - os.remove(f) + os.remove(filepath) inst = self.conn.GetInstance(InstanceName=inst.path) cur_pflags = PassedFlags(*inst["PassedFlags"]) self.assertEqual(inst["LinkTarget"], inst["ExpectedLinkTarget"]) @@ -236,73 +304,82 @@ class TestSoftwareInstalledPackage(SoftwareBaseTestCase): self.assertFalse(cur_pflags.gid) self.assertFalse(cur_pflags.mtime) + @common.mark_dangerous def test_get_instance(self): - for nevra, files in pkg_files: - m = re_nevra.match(nevra) - name, version = [ m.group(k) for k in ("name", "ver") ] - if is_installed(nevra=nevra) and not verify_pkg(name): - remove_pkg(name) - if not is_installed(name): - install_pkg(nevra=nevra) - self.assertTrue(is_installed(nevra=nevra)) - for f in files: - op = self.make_op(nevra, f) - - inst = self.conn.GetInstance(InstanceName=op, LocalOnly=False) + """ + Tests GetInstance call. + """ + for pkg in self.pkgdb: + files = self.pkg_files[pkg.name] + if ( common.is_installed(pkg) + and not common.verify_pkg(pkg.name)): + common.remove_pkg(pkg.name) + if not common.is_installed(pkg.name): + common.install_pkg(pkg) + self.assertTrue(common.is_installed(pkg)) + for filepath in files: + objpath = self.make_op(pkg, filepath) + + inst = self.conn.GetInstance( + InstanceName=objpath, + LocalOnly=False) self.assertIsInstance(inst, pywbem.CIMInstance) - self.assertEqual(inst.path, op) + self.assertEqual(inst.path, objpath) for key in self.KEYS: - self.assertEqual(inst[key], op[key]) + if key.lower() == "softwareelementid": + self.assertEqualSEID(inst[key], objpath[key]) + else: + self.assertEqual(inst[key], objpath[key]) self.assertTrue(inst["FileExists"]) self.assertEqual(len(inst["PassedFlags"]), 10) - self.assertTrue(all(f is True for f in inst["PassedFlags"])) + self.assertTrue(all( filepath is True + for filepath in inst["PassedFlags"])) for prop in ( "FileType", "FileUserID", "FileGroupID" , "FileMode", "FileSize", "LinkTarget" , "FileChecksum", "FileModeFlags"): self.assertEqual(inst["Expected"+prop], inst[prop]) - if os.path.islink(f): - self.do_test_symlink(op, nevra, f, inst) - elif os.path.isdir(f): - self.do_test_directory(op, nevra, f, inst) - elif os.path.isfile(f): - self.do_test_file(op, nevra, f, inst) - + if os.path.islink(filepath): + self.do_test_symlink(filepath, inst) + elif os.path.isdir(filepath): + self.do_test_directory(filepath, inst) + elif os.path.isfile(filepath): + self.do_test_file(filepath, inst) + + @common.mark_dangerous def test_invoke_method(self): - for nevra, files in pkg_files: - m = re_nevra.match(nevra) - name, version = [ m.group(k) for k in ("name", "ver") ] - if is_installed(nevra=nevra) and not verify_pkg(name): - remove_pkg(name) - if not is_installed(name): - install_pkg(nevra=nevra) - self.assertTrue(is_installed(nevra=nevra)) - for f in files: - op = self.make_op(nevra, f) - - (rval, oparms) = self.conn.InvokeMethod( + """ + Tests Invoke method invocation. + """ + for pkg in self.pkgdb: + files = self.pkg_files[pkg.name] + if common.is_installed(pkg) and not common.verify_pkg(pkg.name): + common.remove_pkg(pkg.name) + if not common.is_installed(pkg.name): + common.install_pkg(pkg) + self.assertTrue(common.is_installed(pkg)) + for filepath in files: + objpath = self.make_op(pkg, filepath) + + (rval, _) = self.conn.InvokeMethod( MethodName="Invoke", - ObjectName=op) + ObjectName=objpath) self.assertEqual(rval, pywbem.Uint32(0)) # modify file - if os.path.isfile(f): - os.remove(f) + if os.path.isfile(filepath): + os.remove(filepath) else: - os.lchown(f, os.stat(f).st_uid + 1, -1) - (rval, oparms) = self.conn.InvokeMethod( + os.lchown(filepath, os.stat(filepath).st_uid + 1, -1) + (rval, _) = self.conn.InvokeMethod( MethodName="Invoke", - ObjectName=op) + ObjectName=objpath) self.assertEqual(rval, pywbem.Uint32(2)) - @classmethod - def tearDownClass(cls): - for pkg_nevra, files in pkg_files: - name = re_nevra.match(pkg_nevra).group('name') - if is_installed(name) and not verify_pkg(name): - remove_pkg(name) - if not is_installed(name): - install_pkg(nevra=pkg_nevra) +def suite(): + """For unittest loaders.""" + return unittest.TestLoader().loadTestsFromTestCase( + TestSoftwareFileCheck) if __name__ == "__main__": unittest.main() diff --git a/src/software/test/test_software_installed_package.py b/src/software/test/test_software_installed_package.py index 54b8f35..c6e64de 100755 --- a/src/software/test/test_software_installed_package.py +++ b/src/software/test/test_software_installed_package.py @@ -18,198 +18,240 @@ # # Authors: Michal Minar <miminar@redhat.com> # +""" +Unit tests for LMI_SoftwareInstalledPackage provider. +""" -from common import * +import os +import pywbem import shutil import socket import stat +import unittest -class TestSoftwareInstalledPackage(SoftwareBaseTestCase): +import common +import rpmcache + +class TestSoftwareInstalledPackage(common.SoftwareBaseTestCase): + """ + Basic cim operations test. + """ CLASS_NAME = "LMI_SoftwareInstalledPackage" KEYS = ("Software", "System") - def make_op(self, name, epoch, ver, rel, arch, ses=2): - op = self.op.copy() + def make_op(self, pkg, newer=True, ses=2): + """ + @param ses SoftwareElementState property value + @return object path of LMI_SoftwareInstaledPackage + """ + objpath = self.objpath.copy() pkg_op = pywbem.CIMInstanceName( classname="LMI_SoftwarePackage", namespace="root/cimv2", keybindings={ - "Name" : name, - "SoftwareElementID" : make_nevra( - name, epoch, ver, rel, arch, "ALWAYS"), + "Name" : pkg.name, + "SoftwareElementID" : pkg.get_nevra(newer, 'ALWAYS'), "SoftwareElementState" : pywbem.Uint16(ses), "TargetOperatingSystem" : pywbem.Uint16(36), - "Version" : ver }) + "Version" : getattr(pkg, 'up_ver' if newer else 'ver') }) system_op = pywbem.CIMInstanceName( classname="Linux_ComputerSystem", namespace="root/cimv2", keybindings={ "CreationClassName" : "Linux_ComputerSystem", "Name" : socket.gethostname() }) - op["Software"] = pkg_op - op["System"] = system_op - return op + objpath["Software"] = pkg_op + objpath["System"] = system_op + return objpath + @common.mark_dangerous def test_get_instance(self): - for pkg in packages: - if not is_installed(*pkg[:5]): - install_pkg(*pkg[:5]) - op = self.make_op(*pkg[:5]) - inst = self.conn.GetInstance(InstanceName=op, LocalOnly=False) + """ + Tests GetInstance call on packages from our rpm cache. + TODO: test this in non-dangerous way + """ + for pkg in self.pkgdb: + if not common.is_installed(pkg): + common.install_pkg(pkg) + objpath = self.make_op(pkg) + inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False) self.assertIsSubclass(inst.path.classname, self.CLASS_NAME) self.assertIsSubclass( - inst.path['System'].classname, op['System'].classname) - self.assertIsSubclass( - inst.path['Software'].classname, op['Software'].classname) + inst.path['System'].classname, objpath['System'].classname) + self.assertIsSubclass(inst.path['Software'].classname, + objpath['Software'].classname) self.assertEqual(len(inst.path.keys()), 2) for key in self.KEYS: self.assertEqual(inst[key], inst.path[key]) - self.assertEqual(inst['Software'], op['Software']) - remove_pkg(pkg.name) - op['Software']['SoftwareElementState'] = pywbem.Uint16(1) - with self.assertRaises(pywbem.CIMError) as cm: - self.conn.GetInstance(InstanceName=op, LocalOnly=False) - self.assertEqual(cm.exception.args[0], pywbem.CIM_ERR_NOT_FOUND) + self.assertEqual(inst['Software'], objpath['Software']) + common.remove_pkg(pkg.name) + objpath['Software']['SoftwareElementState'] = pywbem.Uint16(1) + with self.assertRaises(pywbem.CIMError) as cmngr: + self.conn.GetInstance(InstanceName=objpath, LocalOnly=False) + self.assertEqual(cmngr.exception.args[0], pywbem.CIM_ERR_NOT_FOUND) + @common.mark_dangerous def test_enum_instances(self): - pkg = packages[0] - if is_installed(*pkg[:5]): - remove_pkg(pkg.name) + """ + Tests EnumInstances call. + TODO: test this in non-dangerous way + """ + pkg = self.pkgdb[0] + if common.is_installed(pkg.name): + common.remove_pkg(pkg.name) insts1 = self.conn.EnumerateInstanceNames(ClassName=self.CLASS_NAME) - install_pkg(*pkg[:5]) + common.install_pkg(pkg) insts2 = self.conn.EnumerateInstanceNames(ClassName=self.CLASS_NAME) self.assertEqual(len(insts1) + 1, len(insts2)) - op = self.make_op(*pkg[:5]) - self.assertIn(op['Software'], (inst['Software'] for inst in insts2)) + objpath = self.make_op(pkg) + self.assertIn(objpath['Software'], + (inst['Software'] for inst in insts2)) self.assertTrue(all(isinstance(inst, pywbem.CIMInstanceName) for inst in insts1)) insts1 = self.conn.EnumerateInstances(ClassName=self.CLASS_NAME) - remove_pkg(pkg.name) + common.remove_pkg(pkg.name) insts2 = self.conn.EnumerateInstances(ClassName=self.CLASS_NAME) self.assertEqual(len(insts2) + 1, len(insts1)) - self.assertIn(op['Software'], (inst['Software'] for inst in insts1)) + self.assertIn(objpath['Software'], + (inst['Software'] for inst in insts1)) self.assertTrue(all(inst['Software'] == inst.path['Software'] for inst in insts1)) self.assertTrue(all(inst['System'] == inst.path['System'] for inst in insts1)) + @common.mark_dangerous def test_create_instance(self): - for pkg in packages: - if is_installed(pkg.name): - remove_pkg(pkg.name) - self.assertFalse(is_installed(pkg.name)) - op = self.make_op(*(list(pkg[:5]) + [1])) - inst = pywbem.CIMInstance(classname=op.classname, path=op) - inst["Software"] = op["Software"] - inst["System"] = op["System"] + """ + Tests CreateInstance call. + """ + for pkg in self.pkgdb: + if common.is_installed(pkg.name): + common.remove_pkg(pkg.name) + self.assertFalse(common.is_installed(pkg.name)) + objpath = self.make_op(pkg, ses=1) + inst = pywbem.CIMInstance(classname=objpath.classname, path=objpath) + inst["Software"] = objpath["Software"] + inst["System"] = objpath["System"] iname = self.conn.CreateInstance(NewInstance=inst) self.assertIsInstance(iname, pywbem.CIMInstanceName) - op["Software"]["SoftwareElementState"] = pywbem.Uint16(2) - self.assertEqual(iname["Software"], op["Software"]) + objpath["Software"]["SoftwareElementState"] = pywbem.Uint16(2) + self.assertEqual(iname["Software"], objpath["Software"]) self.assertIsInstance(iname["System"], pywbem.CIMInstanceName) self.assertIsSubclass(iname["System"].classname, - op["System"].classname) - self.assertEqual(set(iname.keys()), set(op.keys())) - self.assertTrue(is_installed(pkg.name)) + objpath["System"].classname) + self.assertEqual(set(iname.keys()), set(objpath.keys())) + self.assertTrue(common.is_installed(pkg.name)) - with self.assertRaises(pywbem.CIMError) as cm: + with self.assertRaises(pywbem.CIMError) as cmngr: self.conn.CreateInstance(NewInstance=inst) - self.assertEqual(cm.exception.args[0], + self.assertEqual(cmngr.exception.args[0], pywbem.CIM_ERR_ALREADY_EXISTS) - + + @common.mark_dangerous def test_delete_instance(self): - for pkg in packages: - if not is_installed(pkg.name): - install_pkg(*pkg[:5]) - self.assertTrue(is_installed(pkg.name)) - op = self.make_op(*pkg[:5]) - self.conn.DeleteInstance(op) - self.assertFalse(is_installed(pkg.name)) - with self.assertRaises(pywbem.CIMError) as cm: - self.conn.DeleteInstance(op) - self.assertEqual(cm.exception.args[0], pywbem.CIM_ERR_NOT_FOUND) + """ + Tests DeleteInstance call. + """ + for pkg in self.pkgdb: + if not common.is_installed(pkg.name): + common.install_pkg(pkg) + self.assertTrue(common.is_installed(pkg.name)) + objpath = self.make_op(pkg) + self.conn.DeleteInstance(objpath) + self.assertFalse(common.is_installed(pkg.name)) + with self.assertRaises(pywbem.CIMError) as cmngr: + self.conn.DeleteInstance(objpath) + self.assertEqual(cmngr.exception.args[0], pywbem.CIM_ERR_NOT_FOUND) + @common.mark_dangerous def test_check_integrity(self): - for pkg_nevra, files in pkg_files: - m = re_nevra.match(pkg_nevra) - name, epoch, ver, rel, arch = [ m.group(k) for k in ( - "name", "epoch", "ver", "rel", "arch") ] - if ( (is_installed(nevra=pkg_nevra) and not verify_pkg(name)) - or (is_installed(name) and not is_installed(nevra=pkg_nevra))) : - remove_pkg(name) - if not is_installed(name): - install_pkg(name, epoch, ver, rel, arch) - self.assertTrue(is_installed(nevra=pkg_nevra)) + """ + Tests CheckIntegrity call. + TODO: test this in non-dangerous way + """ + for pkg in self.pkgdb: + files = self.pkg_files[pkg.name] + if ( ( common.is_installed(pkg) + and not common.verify_pkg(pkg.name)) + or ( common.is_installed(pkg.name) + and not common.is_installed(pkg))) : + common.remove_pkg(pkg.name) + if not common.is_installed(pkg.name): + common.install_pkg(pkg) + self.assertTrue(common.is_installed(pkg)) - op = self.make_op(name, epoch, ver, rel, arch) + objpath = self.make_op(pkg) (rval, oparms) = self.conn.InvokeMethod( MethodName="CheckIntegrity", - ObjectName=op) + ObjectName=objpath) self.assertEqual(rval, pywbem.Uint32(0)) # check passed self.assertEqual(len(oparms), 1) self.assertIn("Failed", oparms) self.assertEqual(len(oparms["Failed"]), 0) cnt_bad = 0 - for f in files: - stats = os.lstat(f) - if os.path.islink(f): # modify symbolic link - target = os.readlink(f) - os.remove(f) - os.symlink(target, f) # just touch symlink + for file_path in files: + stats = os.lstat(file_path) + if os.path.islink(file_path): # modify symbolic link + target = os.readlink(file_path) + os.remove(file_path) + os.symlink(target, file_path) # just touch symlink (rval, oparms) = self.conn.InvokeMethod( MethodName="CheckIntegrity", - ObjectName=op) + ObjectName=objpath) # symlink must pass self.assertEqual(len(oparms["Failed"]), cnt_bad) - os.remove(f) + os.remove(file_path) # now change target - os.symlink("wrong_link_target", f) - elif os.path.isdir(f): # check directory - os.chmod(f, stats.st_mode) # just touch dir + os.symlink("wrong_link_target", file_path) + elif os.path.isdir(file_path): # check directory + os.chmod(file_path, stats.st_mode) # just touch dir (rval, oparms) = self.conn.InvokeMethod( MethodName="CheckIntegrity", - ObjectName=op) + ObjectName=objpath) # dir must pass self.assertEqual(len(oparms["Failed"]), cnt_bad) # modify read access of directory - os.chmod(f, stats.st_mode ^ stat.S_IROTH) + os.chmod(file_path, stats.st_mode ^ stat.S_IROTH) else: # modify regular file # just touch file - this is enough to make it fail - with open(f, "w+") as fobj: pass + with open(file_path, "w+"): + pass cnt_bad += 1 (rval, oparms) = self.conn.InvokeMethod( MethodName="CheckIntegrity", - ObjectName=op) + ObjectName=objpath) self.assertEqual(rval, pywbem.Uint32(1)) self.assertEqual(len(oparms), 1) self.assertIn("Failed", oparms) self.assertEqual(len(oparms["Failed"]), cnt_bad) - self.assertIn(f, (p["Name"] for p in oparms["Failed"])) + self.assertIn(file_path, (p["Name"] for p in oparms["Failed"])) + @common.mark_dangerous def test_method_update(self): - for pkg in packages: - if not pkg.updatable: continue - if is_installed(pkg.name) and not is_installed(*pkg[:5]): - remove_pkg(pkg.name) - if not is_installed(pkg.name): - install_pkg(*pkg[:5]) - self.assertTrue(is_installed(*pkg[:5])) - - op = self.make_op(*pkg[:5]) - op_up = self.make_op(pkg.name, pkg.up_epoch, - pkg.up_ver, pkg.up_rel, pkg.arch) + """ + Tests Update method invocation. + """ + for pkg in self.pkgdb: + if ( common.is_installed(pkg.name) + and not common.is_installed(pkg, False)): + common.remove_pkg(pkg.name) + if not common.is_installed(pkg.name): + common.install_pkg(pkg, False) + self.assertTrue(common.is_installed(pkg, False)) + + objpath = self.make_op(pkg, False) + op_up = self.make_op(pkg) (rval, oparms) = self.conn.InvokeMethod( MethodName="Update", - ObjectName=op) + ObjectName=objpath) self.assertEqual(rval, pywbem.Uint16(1)) self.assertEqual(len(oparms), 1) self.assertIn("Installed", oparms) self.assertEqual(oparms["Installed"], op_up["Software"]) - self.assertTrue(is_installed(pkg.name, pkg.up_epoch, - pkg.up_ver, pkg.up_rel, pkg.arch)) + self.assertTrue(common.is_installed(pkg)) (rval, oparms) = self.conn.InvokeMethod( MethodName="Update", @@ -217,44 +259,42 @@ class TestSoftwareInstalledPackage(SoftwareBaseTestCase): self.assertEqual(rval, pywbem.Uint16(0)) self.assertEqual(len(oparms), 1) self.assertEqual(oparms["Installed"], op_up["Software"]) + self.assertTrue(common.is_installed(pkg.name)) + self.assertFalse(common.is_installed(pkg, False)) - with self.assertRaises(pywbem.CIMError) as cm: + with self.assertRaises(pywbem.CIMError) as cmngr: self.conn.InvokeMethod( MethodName="Update", - ObjectName=op) - self.assertEqual(cm.exception.args[0], + ObjectName=objpath) + self.assertEqual(cmngr.exception.args[0], pywbem.CIM_ERR_NOT_FOUND) - remove_pkg(pkg.name) - install_pkg(*pkg[:5]) - self.assertTrue(is_installed(*pkg[:5])) + common.remove_pkg(pkg.name) + common.install_pkg(pkg, False) + self.assertTrue(common.is_installed(pkg, False)) (rval, oparms) = self.conn.InvokeMethod( MethodName="Update", - ObjectName=op, + ObjectName=objpath, Epoch=pkg.epoch, Version=pkg.ver, Release=pkg.rel) self.assertEqual(rval, pywbem.Uint16(0)) - self.assertEqual(oparms["Installed"], op["Software"]) - + self.assertEqual(oparms["Installed"], objpath["Software"]) + (rval, oparms) = self.conn.InvokeMethod( MethodName="Update", - ObjectName=op, + ObjectName=objpath, Epoch=pkg.up_epoch, Version=pkg.up_ver, Release=pkg.up_rel) self.assertEqual(rval, pywbem.Uint16(1)) self.assertEqual(oparms["Installed"], op_up["Software"]) - @classmethod - def tearDownClass(cls): - for pkg_nevra, files in pkg_files: - name = re_nevra.match(pkg_nevra).group('name') - if is_installed(name) and not verify_pkg(name): - remove_pkg(name) - if not is_installed(name): - install_pkg(nevra=pkg_nevra) +def suite(): + """For unittest loaders.""" + return unittest.TestLoader().loadTestsFromTestCase( + TestSoftwareInstalledPackage) if __name__ == "__main__": unittest.main() diff --git a/src/software/test/test_software_package.py b/src/software/test/test_software_package.py index f6fdf9c..d0144ca 100755 --- a/src/software/test/test_software_package.py +++ b/src/software/test/test_software_package.py @@ -18,91 +18,120 @@ # # Authors: Michal Minar <miminar@redhat.com> # +""" +Unit tests for LMI_SoftwareInstalledPackage provider. +""" -from common import * +import pywbem +import unittest -class TestSoftwarePackage(SoftwareBaseTestCase): +import common +import rpmcache + +class TestSoftwarePackage(common.SoftwareBaseTestCase): #pylint: disable=R0904 + """ + Basic cim operations test. + """ CLASS_NAME = "LMI_SoftwarePackage" KEYS = ( "Name", "SoftwareElementID", "SoftwareElementState" , "TargetOperatingSystem", "Version") - def make_op(self, name, epoch, ver, rel, arch, ses=2): - op = self.op.copy() - op["Name"] = name - op["SoftwareElementID"] = make_nevra( - name, epoch, ver, rel, arch, "ALWAYS") - op["SoftwareElementState"] = pywbem.Uint16(ses) - op["TargetOperatingSystem"] = pywbem.Uint16(36) - op["Version"] = ver - return op + def make_op(self, pkg, newer=True, ses=2): + """ + @param ses SoftwareElementState property value + @return object path of SoftwarePackage + """ + objpath = self.objpath.copy() + objpath["Name"] = pkg.name + objpath["SoftwareElementID"] = pkg.get_nevra(newer, "ALWAYS") + objpath["SoftwareElementState"] = pywbem.Uint16(ses) + objpath["TargetOperatingSystem"] = pywbem.Uint16(36) + objpath["Version"] = getattr(pkg, 'up_ver' if newer else 'ver') + return objpath + @common.mark_dangerous def test_get_instance(self): - for pkg in packages: - if is_installed(pkg.name): - remove_pkg(pkg.name) - op = self.make_op(pkg.name, pkg.epoch, pkg.ver, - pkg.rel, pkg.arch, 1) - inst = self.conn.GetInstance(InstanceName=op, LocalOnly=False) - self.assertEqual(inst.path, op) + """ + Tests GetInstance call on packages from our rpm cache. + TODO: test this in non-dangerous way + """ + for pkg in self.pkgdb: + if common.is_installed(pkg.name): + common.remove_pkg(pkg.name) + objpath = self.make_op(pkg, ses=1) + inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False) + self.assertEqual(inst.path, objpath) for key in self.KEYS: self.assertTrue(inst.properties.has_key(key)) self.assertEqual(inst.path[key], inst[key]) - self.assertEqual(inst['Release'], pkg.rel) - install_pkg(*pkg[:5]) - op['SoftwareElementState'] = pywbem.Uint16(2) - inst = self.conn.GetInstance(InstanceName=op, LocalOnly=False) - self.assertEqual(inst.path, op) + self.assertEqual(inst['Release'], pkg.up_rel) + common.install_pkg(pkg) + objpath['SoftwareElementState'] = pywbem.Uint16(2) + inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False) + self.assertEqual(inst.path, objpath) # try to leave out epoch part if pkg.epoch == "0": - op_no_epoch = op.copy() - op_no_epoch["SoftwareElementID"] = make_nevra(pkg.name, pkg.epoch, - pkg.ver, pkg.rel, pkg.arch, "NEVER") + op_no_epoch = objpath.copy() + op_no_epoch["SoftwareElementID"] = rpmcache.make_nevra(pkg.name, + pkg.up_epoch, pkg.up_ver, pkg.up_rel, pkg.arch, "NEVER") self.assertNotIn(":", op_no_epoch["SoftwareElementID"]) inst = self.conn.GetInstance( InstanceName=op_no_epoch, LocalOnly=False) - self.assertIn(inst.path, (op, op_no_epoch)) + self.assertIn(inst.path, (objpath, op_no_epoch)) + @common.mark_dangerous def test_method_install(self): - for pkg in packages: - if is_installed(pkg.name): - remove_pkg(pkg.name) - op = self.make_op(pkg.name, pkg.epoch, pkg.ver, - pkg.rel, pkg.arch, 1) + """ + Tests Install method invocation. + """ + for pkg in self.pkgdb: + if common.is_installed(pkg.name): + common.remove_pkg(pkg.name) + objpath = self.make_op(pkg, ses=1) (rval, oparms) = self.conn.InvokeMethod( MethodName="Install", - ObjectName=op) + ObjectName=objpath) self.assertEqual(rval, pywbem.Uint32(1)) self.assertEqual(len(oparms), 1) self.assertTrue(oparms.has_key('Installed')) - op['SoftwareElementState'] = pywbem.Uint16(2) - self.assertEqual(oparms['Installed'], op) - self.assertTrue(is_installed(pkg.name)) + objpath['SoftwareElementState'] = pywbem.Uint16(2) + self.assertEqual(oparms['Installed'], objpath) + self.assertTrue(common.is_installed(pkg.name)) (rval, oparms) = self.conn.InvokeMethod( MethodName="Install", - ObjectName=op) + ObjectName=objpath) self.assertEqual(rval, pywbem.Uint32(0)) self.assertEqual(len(oparms), 1) - self.assertEqual(oparms['Installed'], op) + self.assertEqual(oparms['Installed'], objpath) + @common.mark_dangerous def test_method_remove(self): - for pkg in packages: - if not is_installed(pkg.name): - install_pkg(*pkg[:5]) - op = self.make_op(*pkg[:5]) + """ + Tests Remove method invocation. + """ + for pkg in self.pkgdb: + if not common.is_installed(pkg.name): + common.install_pkg(pkg) + objpath = self.make_op(pkg) (rval, oparms) = self.conn.InvokeMethod( MethodName="Remove", - ObjectName=op) + ObjectName=objpath) self.assertEqual(rval, pywbem.Uint16(1)) self.assertEqual(len(oparms), 0) - self.assertFalse(is_installed(pkg.name)) - op["SoftwareElementState"] = pywbem.Uint16(1) + self.assertFalse(common.is_installed(pkg.name)) + objpath["SoftwareElementState"] = pywbem.Uint16(1) (rval, oparms) = self.conn.InvokeMethod( MethodName="Remove", - ObjectName=op) + ObjectName=objpath) self.assertEqual(rval, pywbem.Uint32(0)) self.assertEqual(len(oparms), 0) +def suite(): + """For unittest loaders.""" + return unittest.TestLoader().loadTestsFromTestCase( + TestSoftwarePackage) + if __name__ == '__main__': unittest.main() |