From 33552073c3bc903d37382a80621517b07a6eb8eb Mon Sep 17 00:00:00 2001 From: Michal Minar Date: Tue, 11 Dec 2012 11:05:12 +0100 Subject: fixed tests for f18 an RHEL7 --- src/software/test/common.py | 31 +++++++++++++-------- src/software/test/rpmcache.py | 63 +++++++++++++++++++++++++++++++++++-------- 2 files changed, 72 insertions(+), 22 deletions(-) (limited to 'src') diff --git a/src/software/test/common.py b/src/software/test/common.py index 7726d3e..0d199b2 100644 --- a/src/software/test/common.py +++ b/src/software/test/common.py @@ -51,10 +51,12 @@ def install_pkg(pkg, newer=True): In latter case, a specific version is installed. """ if isinstance(pkg, rpmcache.Package): - rpm_name = pkg.get_nevra(newer) + ".rpm" - if os.path.exists(rpm_name): + try: + rpm_name = rpmcache.get_rpm_name(pkg, newer=newer) subprocess.call(["rpm", "--quiet", "-i", rpm_name]) return + except rpmcache.MissingRPM: + pass pkg = pkg.name subprocess.call(["yum", "-q", "-y", "install", pkg]) @@ -68,9 +70,13 @@ def is_installed(pkg, newer=True): return subprocess.call(["rpm", "--quiet", "-q", pkg]) == 0 else: try: - out = subprocess.check_output( - ["rpm", "-q", "--qf", "%{NEVRA}", pkg.name]) - return out == pkg.get_nevra(newer) + cmd = [ "rpm", "-q", "--qf", "%{EPOCH}:%{NVRA}", pkg.name ] + out = subprocess.check_output(cmd) + epoch, nvra = out.split(':') #pylint: disable=E1103 + if not epoch or epoch == "(none)": + epoch = "0" + return ( epoch == pkg.epoch + and nvra == pkg.get_nevra(newer=newer, with_epoch="NEVER")) except subprocess.CalledProcessError: return False @@ -94,15 +100,17 @@ def is_config_file(pkg, file_path): """ @return True, if file_path is a configuration file of package pkg. """ - out = check_output(['rpm', '-qc', pkg.name]) - return file_path in set(out.splitlines()) + cmd = ['rpm', '-qc', pkg.name] + out = check_output(cmd) + return file_path in set(out.splitlines()) #pylint: disable=E1103 def is_doc_file(pkg, file_path): """ @return True, if file_path is a documentation file of package pkg. """ - out = check_output(['rpm', '-qd', pkg.name]) - return file_path in set(out.splitlines()) + cmd = ['rpm', '-qd', pkg.name] + out = check_output(cmd) + return file_path in set(out.splitlines()) #pylint: disable=E1103 def get_pkg_files(pkg): """ @@ -111,13 +119,14 @@ def get_pkg_files(pkg): @param pkg must be installed package @return list of few files installed by pkg """ - output = check_output(['rpm', '-ql', pkg.name]) + cmd = ['rpm', '-ql', pkg.name] + output = check_output(cmd) configs = set() docs = set() dirs = set() files = set() symlinks = set() - for fpath in output.splitlines(): + for fpath in output.splitlines(): #pylint: disable=E1103 if ( len(dirs) == 0 and not os.path.islink(fpath) and os.path.isdir(fpath)): diff --git a/src/software/test/rpmcache.py b/src/software/test/rpmcache.py index 1e3a1ed..af0d47c 100644 --- a/src/software/test/rpmcache.py +++ b/src/software/test/rpmcache.py @@ -19,7 +19,7 @@ # Authors: Radek Novacek # Authors: Michal Minar """ -Creation and manipulation utilities with rpm cache for software tests. +Creation and manipulation utilities with rpm cache for software tests. """ import copy import datetime @@ -29,6 +29,18 @@ import re from collections import defaultdict from subprocess import call, check_output +class InvalidTestCache(Exception): + """Exception saying, that rpm test cache is not valiid.""" + pass +class MissingRPM(InvalidTestCache): + """ + Raised, when requested rpm for package is not contained in + rpm test cache. + """ + def __init__(self, pkg_name): + InvalidTestCache.__init__(self, + "Missing package '%s' in test cache!"%pkg_name) + def make_nevra(name, epoch, ver, rel, arch, with_epoch='NOT_ZERO'): """ @param with_epoch may be one of: @@ -134,7 +146,7 @@ RE_PKG_INFO = re.compile( r'^Size\s*:\s*(?P\d+(\.\d+)?)( *(?P[kMG]))?', re.MULTILINE | re.DOTALL | re.IGNORECASE) RE_REPO = re.compile( - r'^[^\s]+\b(?!\s+id)', re.MULTILINE | re.IGNORECASE) + r'^[^\s/]+\b(?!\s+id)', re.MULTILINE | re.IGNORECASE) # maximum number of packages, that will be selected for testing MAX_PKG_DB_SIZE = 3 @@ -163,7 +175,7 @@ def _filter_duplicates(installed, avail_str): of Package. """ m2pkg = lambda m: Package(m.group('name'), - m.group('epoch') if m.group('epoch') else '0', + m.group('epoch')[:-1] if m.group('epoch') else '0', m.group('version'), m.group('release'), m.group('arch'), m.group('repository'), None, None, None, None) @@ -175,13 +187,13 @@ def _filter_duplicates(installed, avail_str): if ( _match_nevr(match) in [ _match_nevr(m) for m in cur_package_matches] or ( ( not prev_match - or prev_match.group('name') in + or prev_match.group('name') in [m.group('name') for m in cur_package_matches]) and match.group('arch') not in ('noarch', system_arch))): continue if prev_match and prev_match.group('name') != match.group('name'): if ( len(cur_package_matches) > 1 - and not match.group('name') in installed): + and not cur_package_matches[0].group('name') in installed): pkgs = [ m2pkg(m) for m in cur_package_matches ] dups_list.append(pkgs) cur_package_matches = [] @@ -231,13 +243,18 @@ def _sorted_db_by_size(pkgdb): # to get correct ordering from "yum info" command # { pkg_name : [(epoch, version, release), ... ] } pkg_version_order = defaultdict(list) + try: + header = "Available Packages\n" + info_str = info_str[info_str.index(header)+len(header):] + except ValueError: + pass for info_match in RE_PKG_INFO.finditer(info_str): pkg_name = info_match.group('name') size = float(info_match.group('size')) - units = info_match.group('units').lower() + units = info_match.group('units') if units: size *= defaultdict(lambda: 1, - {'k':10**3, 'm':10**6, 'g':10**9})[units] + {'k':10**3, 'm':10**6, 'g':10**9})[units.lower()] pkg_sizes[pkg_name] = size epoch = info_match.group('epoch') if not epoch: @@ -286,15 +303,39 @@ def _download_pkgdb(repolist, pkgdb, cache_dir=None): cmd.extend(pkgs) call(cmd) -def _make_rpm_path(pkg, cache_dir='', newer=True): +def _make_rpm_path(pkg, cache_dir='', newer=True, without_epoch=False): """ @param newer says, whether to use EVR of package to update (in this case, all epoch/ver/rel attributes will be prefixed with "up_") + @param without_epoch if True, epoch will be left out of package name @return path to rpm package made from instance of Package """ if not isinstance(pkg, Package): raise TypeError("pkg must be an instance of Package ") - return os.path.join(cache_dir, pkg.get_nevra(newer)) + '.rpm' + nevra = pkg.get_nevra(newer, + with_epoch='NEVER' if without_epoch else 'NOT_ZERO') + return os.path.join(cache_dir, nevra) + '.rpm' + +def rpm_exists(pkg, cache_dir='', newer=True): + """ + @return True, when rpm package is in cache. + """ + return ( os.path.exists(_make_rpm_path(pkg, cache_dir, newer)) + or os.path.exists(_make_rpm_path(pkg, cache_dir, newer, True))) + +def get_rpm_name(pkg, cache_dir='', newer=True): + """ + Some packages do not have epoch in their name, even if it's higher than + zero. That's why it's necessary to try more variants of rpm name. + @return rpm path to package in cache + """ + path = _make_rpm_path(pkg, cache_dir, newer) + if os.path.exists(path): + return path + path = _make_rpm_path(pkg, cache_dir, newer, True) + if os.path.exists(path): + return path + raise MissingRPM(pkg.name) def get_system_architecture(): """ @@ -332,8 +373,8 @@ def get_pkg_database(force_update=False, use_cache=True, cache_dir=''): pkgdb = load_pkgdb(cache_dir) valid_db = True for pkg in pkgdb: - if ( not os.path.exists(_make_rpm_path(pkg, cache_dir)) - or not os.path.exists(_make_rpm_path(pkg, cache_dir, True))): + if ( not rpm_exists(pkg, cache_dir, False) + or not rpm_exists(pkg, cache_dir, True)): valid_db = False #print "Old package database is not valid" break -- cgit