summaryrefslogtreecommitdiffstats
path: root/src/software/test/rpmcache.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/software/test/rpmcache.py')
-rw-r--r--src/software/test/rpmcache.py63
1 files changed, 52 insertions, 11 deletions
diff --git a/src/software/test/rpmcache.py b/src/software/test/rpmcache.py
index 1e3a1ed..af0d47c 100644
--- a/src/software/test/rpmcache.py
+++ b/src/software/test/rpmcache.py
@@ -19,7 +19,7 @@
# Authors: Radek Novacek <rnovacek@redhat.com>
# Authors: Michal Minar <miminar@redhat.com>
"""
-Creation and manipulation utilities with rpm cache for software tests.
+Creation and manipulation utilities with rpm cache for software tests.
"""
import copy
import datetime
@@ -29,6 +29,18 @@ import re
from collections import defaultdict
from subprocess import call, check_output
+class InvalidTestCache(Exception):
+ """Exception saying, that rpm test cache is not valiid."""
+ pass
+class MissingRPM(InvalidTestCache):
+ """
+ Raised, when requested rpm for package is not contained in
+ rpm test cache.
+ """
+ def __init__(self, pkg_name):
+ InvalidTestCache.__init__(self,
+ "Missing package '%s' in test cache!"%pkg_name)
+
def make_nevra(name, epoch, ver, rel, arch, with_epoch='NOT_ZERO'):
"""
@param with_epoch may be one of:
@@ -134,7 +146,7 @@ RE_PKG_INFO = re.compile(
r'^Size\s*:\s*(?P<size>\d+(\.\d+)?)( *(?P<units>[kMG]))?',
re.MULTILINE | re.DOTALL | re.IGNORECASE)
RE_REPO = re.compile(
- r'^[^\s]+\b(?!\s+id)', re.MULTILINE | re.IGNORECASE)
+ r'^[^\s/]+\b(?!\s+id)', re.MULTILINE | re.IGNORECASE)
# maximum number of packages, that will be selected for testing
MAX_PKG_DB_SIZE = 3
@@ -163,7 +175,7 @@ def _filter_duplicates(installed, avail_str):
of Package.
"""
m2pkg = lambda m: Package(m.group('name'),
- m.group('epoch') if m.group('epoch') else '0',
+ m.group('epoch')[:-1] if m.group('epoch') else '0',
m.group('version'), m.group('release'),
m.group('arch'), m.group('repository'),
None, None, None, None)
@@ -175,13 +187,13 @@ def _filter_duplicates(installed, avail_str):
if ( _match_nevr(match) in [ _match_nevr(m)
for m in cur_package_matches]
or ( ( not prev_match
- or prev_match.group('name') in
+ or prev_match.group('name') in
[m.group('name') for m in cur_package_matches])
and match.group('arch') not in ('noarch', system_arch))):
continue
if prev_match and prev_match.group('name') != match.group('name'):
if ( len(cur_package_matches) > 1
- and not match.group('name') in installed):
+ and not cur_package_matches[0].group('name') in installed):
pkgs = [ m2pkg(m) for m in cur_package_matches ]
dups_list.append(pkgs)
cur_package_matches = []
@@ -231,13 +243,18 @@ def _sorted_db_by_size(pkgdb):
# to get correct ordering from "yum info" command
# { pkg_name : [(epoch, version, release), ... ] }
pkg_version_order = defaultdict(list)
+ try:
+ header = "Available Packages\n"
+ info_str = info_str[info_str.index(header)+len(header):]
+ except ValueError:
+ pass
for info_match in RE_PKG_INFO.finditer(info_str):
pkg_name = info_match.group('name')
size = float(info_match.group('size'))
- units = info_match.group('units').lower()
+ units = info_match.group('units')
if units:
size *= defaultdict(lambda: 1,
- {'k':10**3, 'm':10**6, 'g':10**9})[units]
+ {'k':10**3, 'm':10**6, 'g':10**9})[units.lower()]
pkg_sizes[pkg_name] = size
epoch = info_match.group('epoch')
if not epoch:
@@ -286,15 +303,39 @@ def _download_pkgdb(repolist, pkgdb, cache_dir=None):
cmd.extend(pkgs)
call(cmd)
-def _make_rpm_path(pkg, cache_dir='', newer=True):
+def _make_rpm_path(pkg, cache_dir='', newer=True, without_epoch=False):
"""
@param newer says, whether to use EVR of package to update
(in this case, all epoch/ver/rel attributes will be prefixed with "up_")
+ @param without_epoch if True, epoch will be left out of package name
@return path to rpm package made from instance of Package
"""
if not isinstance(pkg, Package):
raise TypeError("pkg must be an instance of Package ")
- return os.path.join(cache_dir, pkg.get_nevra(newer)) + '.rpm'
+ nevra = pkg.get_nevra(newer,
+ with_epoch='NEVER' if without_epoch else 'NOT_ZERO')
+ return os.path.join(cache_dir, nevra) + '.rpm'
+
+def rpm_exists(pkg, cache_dir='', newer=True):
+ """
+ @return True, when rpm package is in cache.
+ """
+ return ( os.path.exists(_make_rpm_path(pkg, cache_dir, newer))
+ or os.path.exists(_make_rpm_path(pkg, cache_dir, newer, True)))
+
+def get_rpm_name(pkg, cache_dir='', newer=True):
+ """
+ Some packages do not have epoch in their name, even if it's higher than
+ zero. That's why it's necessary to try more variants of rpm name.
+ @return rpm path to package in cache
+ """
+ path = _make_rpm_path(pkg, cache_dir, newer)
+ if os.path.exists(path):
+ return path
+ path = _make_rpm_path(pkg, cache_dir, newer, True)
+ if os.path.exists(path):
+ return path
+ raise MissingRPM(pkg.name)
def get_system_architecture():
"""
@@ -332,8 +373,8 @@ def get_pkg_database(force_update=False, use_cache=True, cache_dir=''):
pkgdb = load_pkgdb(cache_dir)
valid_db = True
for pkg in pkgdb:
- if ( not os.path.exists(_make_rpm_path(pkg, cache_dir))
- or not os.path.exists(_make_rpm_path(pkg, cache_dir, True))):
+ if ( not rpm_exists(pkg, cache_dir, False)
+ or not rpm_exists(pkg, cache_dir, True)):
valid_db = False
#print "Old package database is not valid"
break