summaryrefslogtreecommitdiffstats
path: root/src/software/test/common.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/software/test/common.py')
-rw-r--r--src/software/test/common.py225
1 files changed, 148 insertions, 77 deletions
diff --git a/src/software/test/common.py b/src/software/test/common.py
index f61480a..7726d3e 100644
--- a/src/software/test/common.py
+++ b/src/software/test/common.py
@@ -1,4 +1,4 @@
-#
+# -*- encoding: utf-8 -*-
# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
#
# This library is free software; you can redistribute it and/or
@@ -17,109 +17,143 @@
#
# Authors: Michal Minar <miminar@redhat.com>
#
+"""
+Common utilities and base class for all software tests.
+"""
-from collections import namedtuple
import os
import pywbem
import re
import subprocess
+import tempfile
import unittest
+from subprocess import check_output
+
+import rpmcache
-SCHEMA="http"
-HOSTNAME="localhost"
-PORT=5988
-USER=''
-PASSWORD=''
-
-Package = namedtuple("Package",
- "name, epoch, ver, rel, arch, "
- "updatable, up_epoch, up_ver, up_rel")
-# This is a database of packages used in tests. If you modify this, please
-# ensure, that all corresponding rpm packages are present in test directory.
-packages = [ Package(*args) for args in
- ( ( "python-xlib-doc", "0", "0.15", "0.6.rc1.fc17", "noarch", False
- , None, None, None)
- , ( "slv2", "0", "0.6.6", "8.fc17", "x86_64", True
- , "0", "0.6.6", "9.fc17")
- ) ]
-
-pkg_files = (
- ( "bash-completion-1:2.0-1.fc17.noarch",
- ( "/usr/share/bash-completion/completions/vgs" # symlink
- , "/usr/share/doc/bash-completion-2.0/README" # file
- , "/usr/share/doc/bash-completion-2.0" # directory
- )
- , )
- , )
-
-re_nevra = re.compile(r'^(?P<name>.+)-(?P<evra>(?P<epoch>\d+):(?P<ver>[^-]+)'
+RE_NEVRA = re.compile(r'^(?P<name>.+)-(?P<evra>((?P<epoch>\d+):)?(?P<ver>[^-]+)'
r'-(?P<rel>.+)\.(?P<arch>[^.]+))$')
-def make_nevra(name, epoch, ver, rel, arch, with_epoch='NOT_ZERO'):
- """
- @param with_epoch may be one of:
- "NOT_ZERO" - include epoch only if it's not zero
- "ALWAYS" - include epoch always
- "NEVER" - do not include epoch at all
- """
- estr = ''
- if with_epoch.lower() == "always":
- estr = epoch
- elif with_epoch.lower() == "not_zero":
- if epoch != "0":
- estr = epoch
- if len(estr): estr += ":"
- return "%s-%s%s-%s.%s" % (name, estr, ver, rel, arch)
-
-def remove_pkg(name, *args):
- subprocess.call(["rpm", "--quiet"] + list(args) + ["-e", name])
-
-def install_pkg(*args, **kwargs):
- if len(args) > 1 or len(kwargs) > 0:
- if kwargs.has_key('nevra'):
- m = re_nevra.match(kwargs['nevra'])
- args = [ m.group(k) for k in (
- 'name', 'epoch', 'ver', 'rel', 'arch') ]
- nevra = make_nevra(*args, with_epoch="NEVER")
- rpm_name = nevra + ".rpm"
+def remove_pkg(pkg, *args):
+ """
+ Remove package with rpm command.
+ @param pkg is either instance of Package or package name
+ @param args is a list of parameters for rpm command
+ """
+ if isinstance(pkg, rpmcache.Package):
+ pkg = pkg.name
+ subprocess.call(["rpm", "--quiet"] + list(args) + ["-e", pkg])
+
+def install_pkg(pkg, newer=True):
+ """
+ Install a specific package.
+ @param pkg is either package name or instance of Package
+ In latter case, a specific version is installed.
+ """
+ if isinstance(pkg, rpmcache.Package):
+ rpm_name = pkg.get_nevra(newer) + ".rpm"
if os.path.exists(rpm_name):
subprocess.call(["rpm", "--quiet", "-i", rpm_name])
return
- subprocess.call(["yum", "-q", "-y", "install", args[0]])
+ pkg = pkg.name
+ subprocess.call(["yum", "-q", "-y", "install", pkg])
-def is_installed(*args, **kwargs):
- if len(args) == 1:
- return subprocess.call(["rpm", "--quiet", "-q", args[0]]) == 0
+def is_installed(pkg, newer=True):
+ """
+ Check, whether package is installed.
+ Accepts the same parameters as install_pkg.
+ @see install_pkg
+ """
+ if not isinstance(pkg, rpmcache.Package):
+ return subprocess.call(["rpm", "--quiet", "-q", pkg]) == 0
else:
- if kwargs.has_key("nevra"):
- nevra = kwargs["nevra"]
- name = re_nevra.match(nevra).group('name')
- else:
- nevra = make_nevra(*args)
- name = args[0]
try:
out = subprocess.check_output(
- ["rpm", "-q", "--qf", "%{NEVRA}", name])
- return out == nevra
+ ["rpm", "-q", "--qf", "%{NEVRA}", pkg.name])
+ return out == pkg.get_nevra(newer)
except subprocess.CalledProcessError:
return False
def verify_pkg(name):
+ """
+ @return output of command rpm, with verification output for package
+ """
return subprocess.call(["rpm", "--quiet", "-Va", name]) == 0
-class SoftwareBaseTestCase(unittest.TestCase):
+def mark_dangerous(method):
+ """
+ Decorator for methods of unittest.TestCase subclasses, that
+ skips dangerous tests, if environment variable does not allow them.
+ """
+ if os.environ.get('LMI_RUN_DANGEROUS', '0') == '1':
+ return method
+ else:
+ return unittest.skip("This test is marked as dangerous.")(method)
+
+def is_config_file(pkg, file_path):
+ """
+ @return True, if file_path is a configuration file of package pkg.
+ """
+ out = check_output(['rpm', '-qc', pkg.name])
+ return file_path in set(out.splitlines())
+
+def is_doc_file(pkg, file_path):
+ """
+ @return True, if file_path is a documentation file of package pkg.
+ """
+ out = check_output(['rpm', '-qd', pkg.name])
+ return file_path in set(out.splitlines())
+
+def get_pkg_files(pkg):
+ """
+ Tries to make a the heterogenous and smallest set of test files from
+ package.
+ @param pkg must be installed package
+ @return list of few files installed by pkg
+ """
+ output = check_output(['rpm', '-ql', pkg.name])
+ configs = set()
+ docs = set()
+ dirs = set()
+ files = set()
+ symlinks = set()
+ for fpath in output.splitlines():
+ if ( len(dirs) == 0
+ and not os.path.islink(fpath)
+ and os.path.isdir(fpath)):
+ dirs.add(fpath)
+ elif len(symlinks) == 0 and os.path.islink(fpath):
+ symlinks.add(fpath)
+ elif not os.path.islink(fpath) and os.path.isfile(fpath):
+ if len(configs) == 0 and is_config_file(pkg, fpath):
+ configs.add(fpath)
+ elif len(docs) == 0 and is_doc_file(pkg, fpath):
+ docs.add(fpath)
+ elif len(files) == 0:
+ files.add(fpath)
+ out = list(configs) + list(docs) + list(dirs) + list(symlinks)
+ if len(files) > 0 and len(docs) == 0 and len(symlinks) == 0:
+ out += list(files)
+ return out
+
+class SoftwareBaseTestCase(unittest.TestCase): #pylint: disable=R0904
+ """
+ Base class for all LMI Software test classes.
+ """
+
+ CLASS_NAME = "Define in subclass"
def setUp(self):
unittest.TestCase.setUp(self)
- self.url = "%s://%s:%d" % (SCHEMA, HOSTNAME, PORT)
- self.conn = pywbem.WBEMConnection(self.url, (USER, PASSWORD))
- self.op = pywbem.CIMInstanceName(
+ self.objpath = pywbem.CIMInstanceName(
namespace="root/cimv2", classname=self.CLASS_NAME)
- def tearDown(self):
- del self.conn
-
- def assertIsSubclass(self, cls, base_cls):
+ def assertIsSubclass(self, cls, base_cls): #pylint: disable=C0103
+ """
+ Checks, whether cls is subclass of base_cls from CIM perspective.
+ @param cls name of subclass
+ @param base_cls name of base class
+ """
if not isinstance(cls, basestring):
raise TypeError("cls must be a string")
if not isinstance(base_cls, basestring):
@@ -127,3 +161,40 @@ class SoftwareBaseTestCase(unittest.TestCase):
return self.assertTrue(pywbem.is_subclass(self.conn,
"root/cimv2", base_cls, cls))
+ @classmethod
+ def setUpClass(cls):
+ cls.url = os.environ.get('LMI_CIMOM_URL', 'http://localhost:5988')
+ cls.user = os.environ.get('LMI_CIMOM_USERNAME', '')
+ cls.password = os.environ.get('LMI_CIMOM_PASSWORD', '')
+ cls.broker = os.environ.get('LMI_CIMOM_BROKER', None)
+ cls.conn = pywbem.WBEMConnection(cls.url, (cls.user, cls.password))
+ cls.run_dangerous = (
+ os.environ.get('LMI_RUN_DANGEROUS', '0') == '1')
+ use_cache = os.environ.get('LMI_SOFTWARE_USE_CACHE', '0') == '1'
+ cls.cache_dir = None
+ if use_cache:
+ cls.cache_dir = os.environ.get('LMI_SOFTWARE_CACHE_DIR', None)
+ if cls.cache_dir is None:
+ cls.cache_dir = tempfile.mkdtemp(suffix="software_database")
+ if cls.cache_dir:
+ cls.prev_dir = os.getcwd()
+ if not os.path.exists(cls.cache_dir):
+ os.makedirs(cls.cache_dir)
+ # rpm packages are expected to be in CWD
+ os.chdir(cls.cache_dir)
+ cls.pkgdb = rpmcache.get_pkg_database(use_cache=use_cache)
+ for pkg in cls.pkgdb:
+ if not is_installed(pkg.name):
+ install_pkg(pkg)
+ cls.pkg_files = dict((pkg.name, get_pkg_files(pkg))
+ for pkg in cls.pkgdb)
+
+ @classmethod
+ def tearDownClass(cls):
+ if cls.run_dangerous:
+ for pkg in cls.pkgdb:
+ if is_installed(pkg.name):
+ remove_pkg(pkg.name)
+ if hasattr(cls, "prev_dir"):
+ os.chdir(cls.prev_dir)
+