diff options
Diffstat (limited to 'src/software/test/common.py')
-rw-r--r-- | src/software/test/common.py | 225 |
1 files changed, 148 insertions, 77 deletions
diff --git a/src/software/test/common.py b/src/software/test/common.py index f61480a..7726d3e 100644 --- a/src/software/test/common.py +++ b/src/software/test/common.py @@ -1,4 +1,4 @@ -# +# -*- encoding: utf-8 -*- # Copyright (C) 2012 Red Hat, Inc. All rights reserved. # # This library is free software; you can redistribute it and/or @@ -17,109 +17,143 @@ # # Authors: Michal Minar <miminar@redhat.com> # +""" +Common utilities and base class for all software tests. +""" -from collections import namedtuple import os import pywbem import re import subprocess +import tempfile import unittest +from subprocess import check_output + +import rpmcache -SCHEMA="http" -HOSTNAME="localhost" -PORT=5988 -USER='' -PASSWORD='' - -Package = namedtuple("Package", - "name, epoch, ver, rel, arch, " - "updatable, up_epoch, up_ver, up_rel") -# This is a database of packages used in tests. If you modify this, please -# ensure, that all corresponding rpm packages are present in test directory. -packages = [ Package(*args) for args in - ( ( "python-xlib-doc", "0", "0.15", "0.6.rc1.fc17", "noarch", False - , None, None, None) - , ( "slv2", "0", "0.6.6", "8.fc17", "x86_64", True - , "0", "0.6.6", "9.fc17") - ) ] - -pkg_files = ( - ( "bash-completion-1:2.0-1.fc17.noarch", - ( "/usr/share/bash-completion/completions/vgs" # symlink - , "/usr/share/doc/bash-completion-2.0/README" # file - , "/usr/share/doc/bash-completion-2.0" # directory - ) - , ) - , ) - -re_nevra = re.compile(r'^(?P<name>.+)-(?P<evra>(?P<epoch>\d+):(?P<ver>[^-]+)' +RE_NEVRA = re.compile(r'^(?P<name>.+)-(?P<evra>((?P<epoch>\d+):)?(?P<ver>[^-]+)' r'-(?P<rel>.+)\.(?P<arch>[^.]+))$') -def make_nevra(name, epoch, ver, rel, arch, with_epoch='NOT_ZERO'): - """ - @param with_epoch may be one of: - "NOT_ZERO" - include epoch only if it's not zero - "ALWAYS" - include epoch always - "NEVER" - do not include epoch at all - """ - estr = '' - if with_epoch.lower() == "always": - estr = epoch - elif with_epoch.lower() == "not_zero": - if epoch != "0": - estr = epoch - if len(estr): estr += ":" - return "%s-%s%s-%s.%s" % (name, estr, ver, rel, arch) - -def remove_pkg(name, *args): - subprocess.call(["rpm", "--quiet"] + list(args) + ["-e", name]) - -def install_pkg(*args, **kwargs): - if len(args) > 1 or len(kwargs) > 0: - if kwargs.has_key('nevra'): - m = re_nevra.match(kwargs['nevra']) - args = [ m.group(k) for k in ( - 'name', 'epoch', 'ver', 'rel', 'arch') ] - nevra = make_nevra(*args, with_epoch="NEVER") - rpm_name = nevra + ".rpm" +def remove_pkg(pkg, *args): + """ + Remove package with rpm command. + @param pkg is either instance of Package or package name + @param args is a list of parameters for rpm command + """ + if isinstance(pkg, rpmcache.Package): + pkg = pkg.name + subprocess.call(["rpm", "--quiet"] + list(args) + ["-e", pkg]) + +def install_pkg(pkg, newer=True): + """ + Install a specific package. + @param pkg is either package name or instance of Package + In latter case, a specific version is installed. + """ + if isinstance(pkg, rpmcache.Package): + rpm_name = pkg.get_nevra(newer) + ".rpm" if os.path.exists(rpm_name): subprocess.call(["rpm", "--quiet", "-i", rpm_name]) return - subprocess.call(["yum", "-q", "-y", "install", args[0]]) + pkg = pkg.name + subprocess.call(["yum", "-q", "-y", "install", pkg]) -def is_installed(*args, **kwargs): - if len(args) == 1: - return subprocess.call(["rpm", "--quiet", "-q", args[0]]) == 0 +def is_installed(pkg, newer=True): + """ + Check, whether package is installed. + Accepts the same parameters as install_pkg. + @see install_pkg + """ + if not isinstance(pkg, rpmcache.Package): + return subprocess.call(["rpm", "--quiet", "-q", pkg]) == 0 else: - if kwargs.has_key("nevra"): - nevra = kwargs["nevra"] - name = re_nevra.match(nevra).group('name') - else: - nevra = make_nevra(*args) - name = args[0] try: out = subprocess.check_output( - ["rpm", "-q", "--qf", "%{NEVRA}", name]) - return out == nevra + ["rpm", "-q", "--qf", "%{NEVRA}", pkg.name]) + return out == pkg.get_nevra(newer) except subprocess.CalledProcessError: return False def verify_pkg(name): + """ + @return output of command rpm, with verification output for package + """ return subprocess.call(["rpm", "--quiet", "-Va", name]) == 0 -class SoftwareBaseTestCase(unittest.TestCase): +def mark_dangerous(method): + """ + Decorator for methods of unittest.TestCase subclasses, that + skips dangerous tests, if environment variable does not allow them. + """ + if os.environ.get('LMI_RUN_DANGEROUS', '0') == '1': + return method + else: + return unittest.skip("This test is marked as dangerous.")(method) + +def is_config_file(pkg, file_path): + """ + @return True, if file_path is a configuration file of package pkg. + """ + out = check_output(['rpm', '-qc', pkg.name]) + return file_path in set(out.splitlines()) + +def is_doc_file(pkg, file_path): + """ + @return True, if file_path is a documentation file of package pkg. + """ + out = check_output(['rpm', '-qd', pkg.name]) + return file_path in set(out.splitlines()) + +def get_pkg_files(pkg): + """ + Tries to make a the heterogenous and smallest set of test files from + package. + @param pkg must be installed package + @return list of few files installed by pkg + """ + output = check_output(['rpm', '-ql', pkg.name]) + configs = set() + docs = set() + dirs = set() + files = set() + symlinks = set() + for fpath in output.splitlines(): + if ( len(dirs) == 0 + and not os.path.islink(fpath) + and os.path.isdir(fpath)): + dirs.add(fpath) + elif len(symlinks) == 0 and os.path.islink(fpath): + symlinks.add(fpath) + elif not os.path.islink(fpath) and os.path.isfile(fpath): + if len(configs) == 0 and is_config_file(pkg, fpath): + configs.add(fpath) + elif len(docs) == 0 and is_doc_file(pkg, fpath): + docs.add(fpath) + elif len(files) == 0: + files.add(fpath) + out = list(configs) + list(docs) + list(dirs) + list(symlinks) + if len(files) > 0 and len(docs) == 0 and len(symlinks) == 0: + out += list(files) + return out + +class SoftwareBaseTestCase(unittest.TestCase): #pylint: disable=R0904 + """ + Base class for all LMI Software test classes. + """ + + CLASS_NAME = "Define in subclass" def setUp(self): unittest.TestCase.setUp(self) - self.url = "%s://%s:%d" % (SCHEMA, HOSTNAME, PORT) - self.conn = pywbem.WBEMConnection(self.url, (USER, PASSWORD)) - self.op = pywbem.CIMInstanceName( + self.objpath = pywbem.CIMInstanceName( namespace="root/cimv2", classname=self.CLASS_NAME) - def tearDown(self): - del self.conn - - def assertIsSubclass(self, cls, base_cls): + def assertIsSubclass(self, cls, base_cls): #pylint: disable=C0103 + """ + Checks, whether cls is subclass of base_cls from CIM perspective. + @param cls name of subclass + @param base_cls name of base class + """ if not isinstance(cls, basestring): raise TypeError("cls must be a string") if not isinstance(base_cls, basestring): @@ -127,3 +161,40 @@ class SoftwareBaseTestCase(unittest.TestCase): return self.assertTrue(pywbem.is_subclass(self.conn, "root/cimv2", base_cls, cls)) + @classmethod + def setUpClass(cls): + cls.url = os.environ.get('LMI_CIMOM_URL', 'http://localhost:5988') + cls.user = os.environ.get('LMI_CIMOM_USERNAME', '') + cls.password = os.environ.get('LMI_CIMOM_PASSWORD', '') + cls.broker = os.environ.get('LMI_CIMOM_BROKER', None) + cls.conn = pywbem.WBEMConnection(cls.url, (cls.user, cls.password)) + cls.run_dangerous = ( + os.environ.get('LMI_RUN_DANGEROUS', '0') == '1') + use_cache = os.environ.get('LMI_SOFTWARE_USE_CACHE', '0') == '1' + cls.cache_dir = None + if use_cache: + cls.cache_dir = os.environ.get('LMI_SOFTWARE_CACHE_DIR', None) + if cls.cache_dir is None: + cls.cache_dir = tempfile.mkdtemp(suffix="software_database") + if cls.cache_dir: + cls.prev_dir = os.getcwd() + if not os.path.exists(cls.cache_dir): + os.makedirs(cls.cache_dir) + # rpm packages are expected to be in CWD + os.chdir(cls.cache_dir) + cls.pkgdb = rpmcache.get_pkg_database(use_cache=use_cache) + for pkg in cls.pkgdb: + if not is_installed(pkg.name): + install_pkg(pkg) + cls.pkg_files = dict((pkg.name, get_pkg_files(pkg)) + for pkg in cls.pkgdb) + + @classmethod + def tearDownClass(cls): + if cls.run_dangerous: + for pkg in cls.pkgdb: + if is_installed(pkg.name): + remove_pkg(pkg.name) + if hasattr(cls, "prev_dir"): + os.chdir(cls.prev_dir) + |