# -*- encoding: utf-8 -*- # Copyright (C) 2012 Red Hat, Inc. All rights reserved. # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA # # Authors: Michal Minar # """ Common utilities and base class for all software tests. """ import os import pywbem import re import tempfile import unittest from subprocess import call, check_output import rpmcache RE_NEVRA = re.compile(r'^(?P.+)-(?P((?P\d+):)?(?P[^-]+)' r'-(?P.+)\.(?P[^.]+))$') def remove_pkg(pkg, *args): """ Remove package with rpm command. @param pkg is either instance of Package or package name @param args is a list of parameters for rpm command """ if isinstance(pkg, rpmcache.Package): pkg = pkg.name call(["rpm", "--quiet"] + list(args) + ["-e", pkg]) def install_pkg(pkg, newer=True, repolist=[]): """ Install a specific package. @param pkg is either package name or instance of Package In latter case, a specific version is installed. @param repolist is a list of repositories, that should be used for downloading, if using yum when empty, all enabled repositories are used """ if isinstance(pkg, rpmcache.Package): try: rpm_name = rpmcache.get_rpm_name(pkg, newer=newer) call(["rpm", "--quiet", "-i", rpm_name]) return except rpmcache.MissingRPM: pass pkg = pkg.name rpmcache.run_yum('-q', '-y', 'install', pkg, repolist=repolist) def is_installed(pkg, newer=True): """ Check, whether package is installed. Accepts the same parameters as install_pkg. @see install_pkg """ return rpmcache.is_installed(pkg, newer) def verify_pkg(name): """ @return output of command rpm, with verification output for package """ return call(["rpm", "--quiet", "-Va", name]) == 0 def mark_dangerous(method): """ Decorator for methods of unittest.TestCase subclasses, that skips dangerous tests, if environment variable does not allow them. """ if os.environ.get('LMI_RUN_DANGEROUS', '0') == '1': return method else: return unittest.skip("This test is marked as dangerous.")(method) def is_config_file(pkg, file_path): """ @return True, if file_path is a configuration file of package pkg. """ cmd = ['rpm', '-qc', pkg.name] out = check_output(cmd) return file_path in set(out.splitlines()) #pylint: disable=E1103 def is_doc_file(pkg, file_path): """ @return True, if file_path is a documentation file of package pkg. """ cmd = ['rpm', '-qd', pkg.name] out = check_output(cmd) return file_path in set(out.splitlines()) #pylint: disable=E1103 def get_pkg_files(pkg): """ Tries to make a the heterogenous and smallest set of test files from package. @param pkg must be installed package @return list of few files installed by pkg """ cmd = ['rpm', '-ql', pkg.name] output = check_output(cmd) configs = set() docs = set() dirs = set() files = set() symlinks = set() for fpath in output.splitlines(): #pylint: disable=E1103 if ( len(dirs) == 0 and not os.path.islink(fpath) and os.path.isdir(fpath)): dirs.add(fpath) elif len(symlinks) == 0 and os.path.islink(fpath): symlinks.add(fpath) elif not os.path.islink(fpath) and os.path.isfile(fpath): if len(configs) == 0 and is_config_file(pkg, fpath): configs.add(fpath) elif len(docs) == 0 and is_doc_file(pkg, fpath): docs.add(fpath) elif len(files) == 0: files.add(fpath) out = list(configs) + list(docs) + list(dirs) + list(symlinks) if len(files) > 0 and len(docs) == 0 and len(symlinks) == 0: out += list(files) return out class SoftwareBaseTestCase(unittest.TestCase): #pylint: disable=R0904 """ Base class for all LMI Software test classes. """ CLASS_NAME = "Define in subclass" def __init__(self, *args, **kwargs): unittest.TestCase.__init__(self, *args, **kwargs) self.longMessage = True def setUp(self): unittest.TestCase.setUp(self) self.objpath = pywbem.CIMInstanceName( namespace="root/cimv2", classname=self.CLASS_NAME) def install_pkg(self, pkg, newer=True, repolist=None): if repolist is None: repolist = self.test_repos return install_pkg(pkg, newer, repolist) def assertIsSubclass(self, cls, base_cls): #pylint: disable=C0103 """ Checks, whether cls is subclass of base_cls from CIM perspective. @param cls name of subclass @param base_cls name of base class """ if not isinstance(cls, basestring): raise TypeError("cls must be a string") if not isinstance(base_cls, basestring): raise TypeError("base_cls must be a string") return self.assertTrue(pywbem.is_subclass(self.conn, "root/cimv2", base_cls, cls)) def assertEqual(self, fst, snd, *args, **kwargs): if ( isinstance(fst, pywbem.CIMInstanceName) and isinstance(snd, pywbem.CIMInstanceName) and fst.classname == "LMI_SoftwarePackage" and fst.classname == snd.classname and fst.namespace == snd.namespace and fst.keys() == snd.keys() and all(fst[k] == snd[k] for k in ("Name", "SoftwareElementID", "SoftwareElementState", "Version")) and isinstance(fst["TargetOperatingSystem"], (int, long)) and isinstance(snd["TargetOperatingSystem"], (int, long))): return True return unittest.TestCase.assertEqual( self, fst, snd, *args, **kwargs) @classmethod def setUpClass(cls): cls.url = os.environ.get('LMI_CIMOM_URL', 'http://localhost:5988') cls.user = os.environ.get('LMI_CIMOM_USERNAME', '') cls.password = os.environ.get('LMI_CIMOM_PASSWORD', '') cls.broker = os.environ.get('LMI_CIMOM_BROKER', None) cls.conn = pywbem.WBEMConnection(cls.url, (cls.user, cls.password)) cls.run_dangerous = ( os.environ.get('LMI_RUN_DANGEROUS', '0') == '1') cls.test_repos = os.environ.get('LMI_SOFTWARE_TEST_REPOS', '') if not cls.test_repos: cls.test_repos = [] else: cls.test_repos = cls.test_repos.split(',') use_cache = os.environ.get('LMI_SOFTWARE_USE_CACHE', '0') == '1' cls.cache_dir = None if use_cache: cls.cache_dir = os.environ.get('LMI_SOFTWARE_CACHE_DIR', None) if cls.cache_dir is None: cls.cache_dir = tempfile.mkdtemp(suffix="software_database") if cls.cache_dir: cls.prev_dir = os.getcwd() if not os.path.exists(cls.cache_dir): os.makedirs(cls.cache_dir) # rpm packages are expected to be in CWD os.chdir(cls.cache_dir) cls.pkgdb = rpmcache.get_pkg_database(use_cache=use_cache) for pkg in cls.pkgdb: if not is_installed(pkg.name): install_pkg(pkg, repolist=cls.test_repos) cls.pkg_files = dict((pkg.name, get_pkg_files(pkg)) for pkg in cls.pkgdb) @classmethod def tearDownClass(cls): if cls.run_dangerous: for pkg in cls.pkgdb: if is_installed(pkg.name): remove_pkg(pkg.name) if hasattr(cls, "prev_dir"): os.chdir(cls.prev_dir)