#!/usr/bin/python # -*- Coding:utf-8 -*- # # Copyright (C) 2012 Red Hat, Inc. All rights reserved. # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA # # Authors: Radek Novacek # Authors: Michal Minar """ Creation and manipulation utilities with rpm cache for software tests. """ import copy import datetime import os import pickle import re from collections import defaultdict from subprocess import call, check_output class InvalidTestCache(Exception): """Exception saying, that rpm test cache is not valiid.""" pass class MissingRPM(InvalidTestCache): """ Raised, when requested rpm for package is not contained in rpm test cache. """ def __init__(self, pkg_name): InvalidTestCache.__init__(self, "Missing package '%s' in test cache!"%pkg_name) def make_nevra(name, epoch, ver, rel, arch, with_epoch='NOT_ZERO'): """ @param with_epoch may be one of: "NOT_ZERO" - include epoch only if it's not zero "ALWAYS" - include epoch always "NEVER" - do not include epoch at all """ estr = '' if with_epoch.lower() == "always": estr = epoch elif with_epoch.lower() == "not_zero": if epoch != "0": estr = epoch if len(estr): estr += ":" return "%s-%s%s-%s.%s" % (name, estr, ver, rel, arch) class Package(object): #pylint: disable=R0902 """ Element of test package database. It's a container for package informations. It contains two sets of versions for single package. That's meant for updating tests. """ def __init__(self, name, epoch, ver, rel, arch, repo, up_epoch, up_ver, up_rel, up_repo): """ Arguments prefixed with 'up_' are for newer package. """ self._name = name self._epoch = epoch self._ver = ver self._rel = rel self._arch = arch self._repo = repo self._up_epoch = up_epoch self._up_ver = up_ver self._up_rel = up_rel self._up_repo = up_repo @property def name(self): return self._name #pylint: disable=C0111,C0321 @property def epoch(self): return self._epoch #pylint: disable=C0111,C0321 @property def ver(self): return self._ver #pylint: disable=C0111,C0321 @property def rel(self): return self._rel #pylint: disable=C0111,C0321 @property def arch(self): return self._arch #pylint: disable=C0111,C0321 @property def repo(self): return self._repo #pylint: disable=C0111,C0321 @property def nevra(self): #pylint: disable=C0111,C0321 return self.get_nevra(True) @property def up_epoch(self): return self._up_epoch #pylint: disable=C0111,C0321 @property def up_ver(self): return self._up_ver #pylint: disable=C0111,C0321 @property def up_rel(self): return self._up_rel #pylint: disable=C0111,C0321 @property def up_repo(self): return self._up_repo #pylint: disable=C0111,C0321 @property def up_nevra(self): #pylint: disable=C0111,C0321 return self.get_nevra(True) def get_nevra(self, newer=True, with_epoch='NOT_ZERO'): """ @newer if True, evr part is made from properties prefixed with 'up_' @return pkg nevra string """ if newer: attrs = ['name', 'up_epoch', 'up_ver', 'up_rel', 'arch'] else: attrs = ['name', 'epoch', 'ver', 'rel', 'arch'] return make_nevra(*[getattr(self, '_'+a) for a in attrs], with_epoch=with_epoch) DB_BACKUP_FILE = 'lmi_software_test_cache' RE_AVAIL_PKG = re.compile( r'^(?P[^\s]+)\.(?P[a-zA-Z0-9_]+)' r'\s+(?P([0-9]+:)?)(?P[a-zA-Z0-9._+-]+)' r'-(?P[a-zA-Z0-9_.]+)\s+' r'(?P[a-zA-Z0-9_-]+)\s*$', re.MULTILINE) # this won't match the last entry, unless "package\n" is not appended # at the end of the string RE_PKG_DEPS = re.compile( r'^package:\s*(?P[^\s]+)\.(?P[a-zA-Z0-9_]+)' r'\s+(?P([0-9]+:)?)(?P[a-zA-Z0-9._+-]+)' r'-(?P[a-zA-Z0-9_.]+)\s+(?P.*?)' r'(?=^package|\Z)', re.MULTILINE | re.DOTALL) RE_DEPS_PROVIDERS = re.compile( r'^\s+provider:\s*(?P[^\s]+)\.(?P[a-zA-Z0-9_]+)' r'\s+(?P([0-9]+:)?)(?P[a-zA-Z0-9._+-]+)' r'-(?P[a-zA-Z0-9_.]+)\s*$', re.IGNORECASE | re.MULTILINE) RE_PKG_INFO = re.compile( r'^Name\s*:\s*(?P[^\s]+).*?' r'^(Epoch\s*:\s*(?P[0-9]+)\s+)?' r'^Version\s*:\s*(?P[a-zA-Z0-9._+-]+)\s+' r'^Release\s*:\s*(?P[^\s]+)\s+.*?' r'^Size\s*:\s*(?P\d+(\.\d+)?)( *(?P[kMG]))?', re.MULTILINE | re.DOTALL | re.IGNORECASE) RE_REPO = re.compile( r'(?:^\*?)(?P[^\s/]+\b)(?!\s+id)', re.MULTILINE | re.IGNORECASE) # maximum number of packages, that will be selected for testing MAX_PKG_DB_SIZE = 3 # step used to iterate over package names used to check for thery dependencies # it's a number of packages, that will be passed to yum command at once PKG_DEPS_ITER_STEP = 50 def _match_nevr(match): """ @param match is a regexp match object with parsed rpm package @return tuple (name, epoch, version, release) """ return ( match.group('name') , match.group('epoch') , match.group('version') , match.group('release')) def _filter_duplicates(installed, avail_str): """ Parse output of "yum list available" command and retuns only those packages occuring in multiple versions. @param installed is a set of installed package names @param avail_str yum command output @return [ [pkg1v1, pkg1v2, ...], [pkg2v1, pkg2v2, ...], ... ] Each sublist of result contain at least 2 elements, that are instances of Package. """ m2pkg = lambda m: Package(m.group('name'), m.group('epoch')[:-1] if m.group('epoch') else '0', m.group('version'), m.group('release'), m.group('arch'), m.group('repository'), None, None, None, None) dups_list = [] cur_package_matches = [] prev_match = None system_arch = get_system_architecture() for match in RE_AVAIL_PKG.finditer(avail_str): if ( _match_nevr(match) in [ _match_nevr(m) for m in cur_package_matches] or ( ( not prev_match or prev_match.group('name') in [m.group('name') for m in cur_package_matches]) and match.group('arch') not in ('noarch', system_arch))): continue if prev_match and prev_match.group('name') != match.group('name'): if ( len(cur_package_matches) > 1 and not cur_package_matches[0].group('name') in installed): pkgs = [ m2pkg(m) for m in cur_package_matches ] dups_list.append(pkgs) cur_package_matches = [] cur_package_matches.append(match) prev_match = match if len(cur_package_matches) > 1: dups_list.append([ m2pkg(m) for m in cur_package_matches ]) return dups_list def _check_pkg_dependencies( installed, dup_list, number_of_packages=MAX_PKG_DB_SIZE): """ Finds packages from dup_list with satisfied (installed) dependencies. @param installed is a set of installed package names @return filtered dup_list with at least number_of_packages elements. """ cmd = ['yum', 'deplist'] dups_no_deps = [] for i in range(0, len(dup_list), PKG_DEPS_ITER_STEP): dups_part = dup_list[i:i+PKG_DEPS_ITER_STEP] cmd = cmd[:2] for dups in dups_part: cmd.append(dups[0].name) deplist_str = check_output(cmd) for pkgs, match_pkg in zip(dups_part, RE_PKG_DEPS.finditer(deplist_str)): for match_dep in RE_DEPS_PROVIDERS.finditer( match_pkg.group('dep_list')): if match_dep.group('name') not in installed: break else: dups_no_deps.append(pkgs) if len(dups_no_deps) >= number_of_packages: break return dups_no_deps def _sorted_db_by_size(pkgdb): """ @param pkgdb is a list of lists of packages with common name @return sorted instances of Package according to their size """ cmd = ['yum', 'info', '--showduplicates'] cmd.extend([ps[0].name for ps in pkgdb]) info_str = check_output(cmd) pkg_sizes = {} # to get correct ordering from "yum info" command # { pkg_name : [(epoch, version, release), ... ] } pkg_version_order = defaultdict(list) try: header = "Available Packages\n" info_str = info_str[info_str.index(header)+len(header):] except ValueError: pass for info_match in RE_PKG_INFO.finditer(info_str): pkg_name = info_match.group('name') size = float(info_match.group('size')) units = info_match.group('units') if units: size *= defaultdict(lambda: 1, {'k':10**3, 'm':10**6, 'g':10**9})[units.lower()] pkg_sizes[pkg_name] = size epoch = info_match.group('epoch') if not epoch: epoch = "0" pkg_version_order[pkg_name].append(( epoch, info_match.group('version'), info_match.group('release'))) pkgdb = sorted(pkgdb, key=lambda pkgs: pkg_sizes[pkgs[0].name])[ :MAX_PKG_DB_SIZE] for i, pkgs in enumerate(pkgdb): pkgs = sorted(pkgs, key=lambda p: pkg_version_order[pkgs[0].name].index((p.epoch, p.ver, p.rel))) pkg_kwargs = dict((k, getattr(pkgs[0], k)) for k in ('name', 'arch') ) for attr in ('epoch', 'ver', 'rel', 'repo'): pkg_kwargs[attr] = getattr(pkgs[0], attr) pkg_kwargs['up_'+attr] = getattr(pkgs[-1], attr) pkgdb[i] = Package(**pkg_kwargs) return pkgdb def _get_repo_list(): """ @return list of software repository names """ repos_str = check_output(['yum', 'repolist', '-q']) return RE_REPO.findall(repos_str) def _download_pkgdb(repolist, pkgdb, cache_dir=None): """ Downloads all rpm packages (old and newer versions) from package database to current directory. """ repo_pkgs = defaultdict(list) for pkg in pkgdb: repo_pkgs[pkg.repo].append(pkg.name) repo_pkgs[pkg.up_repo].append(pkg.name) base_cmd = ['yumdownloader'] if cache_dir: base_cmd.extend(['--destdir', cache_dir]) for repo, pkgs in repo_pkgs.items(): cmd = copy.copy(base_cmd) repos = set(repolist) repos.remove(repo) for not_allowed_repo in repos: cmd.append('--disablerepo='+not_allowed_repo) cmd.append('--enablerepo='+repo) cmd.extend(pkgs) call(cmd) def _make_rpm_path(pkg, cache_dir='', newer=True, without_epoch=False): """ @param newer says, whether to use EVR of package to update (in this case, all epoch/ver/rel attributes will be prefixed with "up_") @param without_epoch if True, epoch will be left out of package name @return path to rpm package made from instance of Package """ if not isinstance(pkg, Package): raise TypeError("pkg must be an instance of Package ") nevra = pkg.get_nevra(newer, with_epoch='NEVER' if without_epoch else 'NOT_ZERO') return os.path.join(cache_dir, nevra) + '.rpm' def rpm_exists(pkg, cache_dir='', newer=True): """ @return True, when rpm package is in cache. """ return ( os.path.exists(_make_rpm_path(pkg, cache_dir, newer)) or os.path.exists(_make_rpm_path(pkg, cache_dir, newer, True))) def get_rpm_name(pkg, cache_dir='', newer=True): """ Some packages do not have epoch in their name, even if it's higher than zero. That's why it's necessary to try more variants of rpm name. @return rpm path to package in cache """ path = _make_rpm_path(pkg, cache_dir, newer) if os.path.exists(path): return path path = _make_rpm_path(pkg, cache_dir, newer, True) if os.path.exists(path): return path raise MissingRPM(pkg.name) def get_system_architecture(): """ @return the system architecture name as seen by rpm """ return check_output(['rpm', '-q', '--qf', '%{ARCH}\n', 'rpm']) def write_pkgdb(pkgdb, cache_dir=''): """ Writes package database into a file named DB_BACKUP_FILE. """ with open(os.path.join(cache_dir, DB_BACKUP_FILE), 'w') as db_file: pickle.dump((datetime.datetime.now(), pkgdb), db_file) def load_pkgdb(cache_dir=''): """ This is inverse function to _write_pkgdb(). @return package database loaded from file """ with open(os.path.join(cache_dir, DB_BACKUP_FILE), 'r') as db_file: date_time, pkgdb = pickle.load(db_file) #print "Loaded package database from: %s" % date_time return pkgdb def get_pkg_database(force_update=False, use_cache=True, cache_dir=''): """ Checks yum database for available packages, that have at least two different versions in repositories. Only not installed ones with all of their dependencies intalled are selected. And from those, few of the smallest are downloaded as rpms. @return list of instances of Package of selected packages """ if ( use_cache and not force_update and os.path.exists(os.path.join(cache_dir, DB_BACKUP_FILE))): pkgdb = load_pkgdb(cache_dir) valid_db = True for pkg in pkgdb: if ( not rpm_exists(pkg, cache_dir, False) or not rpm_exists(pkg, cache_dir, True)): valid_db = False #print "Old package database is not valid" break if valid_db: return pkgdb #print "Getting installed packages" installed = set(check_output( #pylint: disable=E1103 ['rpm', '-qa', '--qf=%{NAME}\n']).splitlines()) #print "Getting all available packages" avail_str = check_output(['yum', 'list', 'available', '--showduplicates']) # list of lists of packages with the same name, longer than 2 #print "Finding duplicates" dups_list = _filter_duplicates(installed, avail_str) #print "Selecting only those (from %d) with installed dependencies" % \ #len(dups_list) selected = _check_pkg_dependencies(installed, dups_list, number_of_packages=MAX_PKG_DB_SIZE*5) #print "Selecting the smallest ones" pkgdb = _sorted_db_by_size(selected) if use_cache: repolist = _get_repo_list() _download_pkgdb(repolist, pkgdb, cache_dir) #print "Backing up database information" write_pkgdb(pkgdb, cache_dir) return pkgdb