summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichal Minar <miminar@redhat.com>2012-12-07 10:25:07 +0100
committerMichal Minar <miminar@redhat.com>2012-12-07 10:25:07 +0100
commitee01a7c83b0fb9fcf0555007baeaad9fa421df63 (patch)
tree6357ca9dce2c4ec0973f2a93c06b025bb467625b
parentf1a2bbd99380602bc4d33833f1bc78e38899ae08 (diff)
downloadopenlmi-providers-ee01a7c83b0fb9fcf0555007baeaad9fa421df63.tar.gz
openlmi-providers-ee01a7c83b0fb9fcf0555007baeaad9fa421df63.tar.xz
openlmi-providers-ee01a7c83b0fb9fcf0555007baeaad9fa421df63.zip
rewritten tests for software management
All tests are invocable from single script called "run_tests.py" and as standalone-modules as well. Its behaviour is modifiable through environment variables according to TestGuidelines. run_tests.py can be passed arguments instead. Package database is not fixed anymore. Instead it's generated automatically for system, where tests are triggered.
-rw-r--r--src/software/test/README31
-rw-r--r--src/software/test/common.py225
-rw-r--r--src/software/test/rpmcache.py361
-rwxr-xr-xsrc/software/test/run_tests.py257
-rwxr-xr-xsrc/software/test/test_software_file_check.py261
-rwxr-xr-xsrc/software/test/test_software_installed_package.py278
-rwxr-xr-xsrc/software/test/test_software_package.py121
7 files changed, 1195 insertions, 339 deletions
diff --git a/src/software/test/README b/src/software/test/README
index d60d1e8..a64382d 100644
--- a/src/software/test/README
+++ b/src/software/test/README
@@ -1,5 +1,26 @@
-Tests must be run as root on machine with sfcbd broker with
-test dir as CWD. File common.py contains connection settings and
-also test data.
-This directory should contain all rpm packages, that will take part in testing
-to speed it up. These packages are listed in common.py file.
+Tests must be run as root on machine with sfcbd broker.
+There are number of environment variables, that affect test running:
+ LMI_CIMOM_USERNAME
+ LMI_CIMOM_PASSWORD
+ LMI_CIMOM_BROKER
+ LMI_CIMOM_URL
+ LMI_RUN_DANGEROUS - valid values: 0, 1
+ LMI_SOFTWARE_USE_CACHE - valid values: 0, 1
+ - says, whether to use cache to download rpm packages, which will
+ be used to speed up testing
+ LMI_SOFTWARE_CACHE_DIR
+ - directory, where to store rpm packages and other information
+ - if not set and LMI_SOFTWARE_USE_CACHE == '1', a temporary
+ directory will be created and deleted, when tests are
+ complete
+
+Simple usage:
+ Each test module can be run as a stand-alone script or with run_tests.py
+ script:
+ $ ./run_tests.py -c --cache-dir=/var/tmp
+ To pass any arguments to underlying unittest.main() function, append them
+ after "--" swith like this:
+ $ ./run_tests.py -c --cache-dir=/var/tmp -- -v TestSoftwarePackage
+ All environment variables defined above can be overriden by command-line
+ arguments.
+
diff --git a/src/software/test/common.py b/src/software/test/common.py
index f61480a..7726d3e 100644
--- a/src/software/test/common.py
+++ b/src/software/test/common.py
@@ -1,4 +1,4 @@
-#
+# -*- encoding: utf-8 -*-
# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
#
# This library is free software; you can redistribute it and/or
@@ -17,109 +17,143 @@
#
# Authors: Michal Minar <miminar@redhat.com>
#
+"""
+Common utilities and base class for all software tests.
+"""
-from collections import namedtuple
import os
import pywbem
import re
import subprocess
+import tempfile
import unittest
+from subprocess import check_output
+
+import rpmcache
-SCHEMA="http"
-HOSTNAME="localhost"
-PORT=5988
-USER=''
-PASSWORD=''
-
-Package = namedtuple("Package",
- "name, epoch, ver, rel, arch, "
- "updatable, up_epoch, up_ver, up_rel")
-# This is a database of packages used in tests. If you modify this, please
-# ensure, that all corresponding rpm packages are present in test directory.
-packages = [ Package(*args) for args in
- ( ( "python-xlib-doc", "0", "0.15", "0.6.rc1.fc17", "noarch", False
- , None, None, None)
- , ( "slv2", "0", "0.6.6", "8.fc17", "x86_64", True
- , "0", "0.6.6", "9.fc17")
- ) ]
-
-pkg_files = (
- ( "bash-completion-1:2.0-1.fc17.noarch",
- ( "/usr/share/bash-completion/completions/vgs" # symlink
- , "/usr/share/doc/bash-completion-2.0/README" # file
- , "/usr/share/doc/bash-completion-2.0" # directory
- )
- , )
- , )
-
-re_nevra = re.compile(r'^(?P<name>.+)-(?P<evra>(?P<epoch>\d+):(?P<ver>[^-]+)'
+RE_NEVRA = re.compile(r'^(?P<name>.+)-(?P<evra>((?P<epoch>\d+):)?(?P<ver>[^-]+)'
r'-(?P<rel>.+)\.(?P<arch>[^.]+))$')
-def make_nevra(name, epoch, ver, rel, arch, with_epoch='NOT_ZERO'):
- """
- @param with_epoch may be one of:
- "NOT_ZERO" - include epoch only if it's not zero
- "ALWAYS" - include epoch always
- "NEVER" - do not include epoch at all
- """
- estr = ''
- if with_epoch.lower() == "always":
- estr = epoch
- elif with_epoch.lower() == "not_zero":
- if epoch != "0":
- estr = epoch
- if len(estr): estr += ":"
- return "%s-%s%s-%s.%s" % (name, estr, ver, rel, arch)
-
-def remove_pkg(name, *args):
- subprocess.call(["rpm", "--quiet"] + list(args) + ["-e", name])
-
-def install_pkg(*args, **kwargs):
- if len(args) > 1 or len(kwargs) > 0:
- if kwargs.has_key('nevra'):
- m = re_nevra.match(kwargs['nevra'])
- args = [ m.group(k) for k in (
- 'name', 'epoch', 'ver', 'rel', 'arch') ]
- nevra = make_nevra(*args, with_epoch="NEVER")
- rpm_name = nevra + ".rpm"
+def remove_pkg(pkg, *args):
+ """
+ Remove package with rpm command.
+ @param pkg is either instance of Package or package name
+ @param args is a list of parameters for rpm command
+ """
+ if isinstance(pkg, rpmcache.Package):
+ pkg = pkg.name
+ subprocess.call(["rpm", "--quiet"] + list(args) + ["-e", pkg])
+
+def install_pkg(pkg, newer=True):
+ """
+ Install a specific package.
+ @param pkg is either package name or instance of Package
+ In latter case, a specific version is installed.
+ """
+ if isinstance(pkg, rpmcache.Package):
+ rpm_name = pkg.get_nevra(newer) + ".rpm"
if os.path.exists(rpm_name):
subprocess.call(["rpm", "--quiet", "-i", rpm_name])
return
- subprocess.call(["yum", "-q", "-y", "install", args[0]])
+ pkg = pkg.name
+ subprocess.call(["yum", "-q", "-y", "install", pkg])
-def is_installed(*args, **kwargs):
- if len(args) == 1:
- return subprocess.call(["rpm", "--quiet", "-q", args[0]]) == 0
+def is_installed(pkg, newer=True):
+ """
+ Check, whether package is installed.
+ Accepts the same parameters as install_pkg.
+ @see install_pkg
+ """
+ if not isinstance(pkg, rpmcache.Package):
+ return subprocess.call(["rpm", "--quiet", "-q", pkg]) == 0
else:
- if kwargs.has_key("nevra"):
- nevra = kwargs["nevra"]
- name = re_nevra.match(nevra).group('name')
- else:
- nevra = make_nevra(*args)
- name = args[0]
try:
out = subprocess.check_output(
- ["rpm", "-q", "--qf", "%{NEVRA}", name])
- return out == nevra
+ ["rpm", "-q", "--qf", "%{NEVRA}", pkg.name])
+ return out == pkg.get_nevra(newer)
except subprocess.CalledProcessError:
return False
def verify_pkg(name):
+ """
+ @return output of command rpm, with verification output for package
+ """
return subprocess.call(["rpm", "--quiet", "-Va", name]) == 0
-class SoftwareBaseTestCase(unittest.TestCase):
+def mark_dangerous(method):
+ """
+ Decorator for methods of unittest.TestCase subclasses, that
+ skips dangerous tests, if environment variable does not allow them.
+ """
+ if os.environ.get('LMI_RUN_DANGEROUS', '0') == '1':
+ return method
+ else:
+ return unittest.skip("This test is marked as dangerous.")(method)
+
+def is_config_file(pkg, file_path):
+ """
+ @return True, if file_path is a configuration file of package pkg.
+ """
+ out = check_output(['rpm', '-qc', pkg.name])
+ return file_path in set(out.splitlines())
+
+def is_doc_file(pkg, file_path):
+ """
+ @return True, if file_path is a documentation file of package pkg.
+ """
+ out = check_output(['rpm', '-qd', pkg.name])
+ return file_path in set(out.splitlines())
+
+def get_pkg_files(pkg):
+ """
+ Tries to make a the heterogenous and smallest set of test files from
+ package.
+ @param pkg must be installed package
+ @return list of few files installed by pkg
+ """
+ output = check_output(['rpm', '-ql', pkg.name])
+ configs = set()
+ docs = set()
+ dirs = set()
+ files = set()
+ symlinks = set()
+ for fpath in output.splitlines():
+ if ( len(dirs) == 0
+ and not os.path.islink(fpath)
+ and os.path.isdir(fpath)):
+ dirs.add(fpath)
+ elif len(symlinks) == 0 and os.path.islink(fpath):
+ symlinks.add(fpath)
+ elif not os.path.islink(fpath) and os.path.isfile(fpath):
+ if len(configs) == 0 and is_config_file(pkg, fpath):
+ configs.add(fpath)
+ elif len(docs) == 0 and is_doc_file(pkg, fpath):
+ docs.add(fpath)
+ elif len(files) == 0:
+ files.add(fpath)
+ out = list(configs) + list(docs) + list(dirs) + list(symlinks)
+ if len(files) > 0 and len(docs) == 0 and len(symlinks) == 0:
+ out += list(files)
+ return out
+
+class SoftwareBaseTestCase(unittest.TestCase): #pylint: disable=R0904
+ """
+ Base class for all LMI Software test classes.
+ """
+
+ CLASS_NAME = "Define in subclass"
def setUp(self):
unittest.TestCase.setUp(self)
- self.url = "%s://%s:%d" % (SCHEMA, HOSTNAME, PORT)
- self.conn = pywbem.WBEMConnection(self.url, (USER, PASSWORD))
- self.op = pywbem.CIMInstanceName(
+ self.objpath = pywbem.CIMInstanceName(
namespace="root/cimv2", classname=self.CLASS_NAME)
- def tearDown(self):
- del self.conn
-
- def assertIsSubclass(self, cls, base_cls):
+ def assertIsSubclass(self, cls, base_cls): #pylint: disable=C0103
+ """
+ Checks, whether cls is subclass of base_cls from CIM perspective.
+ @param cls name of subclass
+ @param base_cls name of base class
+ """
if not isinstance(cls, basestring):
raise TypeError("cls must be a string")
if not isinstance(base_cls, basestring):
@@ -127,3 +161,40 @@ class SoftwareBaseTestCase(unittest.TestCase):
return self.assertTrue(pywbem.is_subclass(self.conn,
"root/cimv2", base_cls, cls))
+ @classmethod
+ def setUpClass(cls):
+ cls.url = os.environ.get('LMI_CIMOM_URL', 'http://localhost:5988')
+ cls.user = os.environ.get('LMI_CIMOM_USERNAME', '')
+ cls.password = os.environ.get('LMI_CIMOM_PASSWORD', '')
+ cls.broker = os.environ.get('LMI_CIMOM_BROKER', None)
+ cls.conn = pywbem.WBEMConnection(cls.url, (cls.user, cls.password))
+ cls.run_dangerous = (
+ os.environ.get('LMI_RUN_DANGEROUS', '0') == '1')
+ use_cache = os.environ.get('LMI_SOFTWARE_USE_CACHE', '0') == '1'
+ cls.cache_dir = None
+ if use_cache:
+ cls.cache_dir = os.environ.get('LMI_SOFTWARE_CACHE_DIR', None)
+ if cls.cache_dir is None:
+ cls.cache_dir = tempfile.mkdtemp(suffix="software_database")
+ if cls.cache_dir:
+ cls.prev_dir = os.getcwd()
+ if not os.path.exists(cls.cache_dir):
+ os.makedirs(cls.cache_dir)
+ # rpm packages are expected to be in CWD
+ os.chdir(cls.cache_dir)
+ cls.pkgdb = rpmcache.get_pkg_database(use_cache=use_cache)
+ for pkg in cls.pkgdb:
+ if not is_installed(pkg.name):
+ install_pkg(pkg)
+ cls.pkg_files = dict((pkg.name, get_pkg_files(pkg))
+ for pkg in cls.pkgdb)
+
+ @classmethod
+ def tearDownClass(cls):
+ if cls.run_dangerous:
+ for pkg in cls.pkgdb:
+ if is_installed(pkg.name):
+ remove_pkg(pkg.name)
+ if hasattr(cls, "prev_dir"):
+ os.chdir(cls.prev_dir)
+
diff --git a/src/software/test/rpmcache.py b/src/software/test/rpmcache.py
new file mode 100644
index 0000000..1e3a1ed
--- /dev/null
+++ b/src/software/test/rpmcache.py
@@ -0,0 +1,361 @@
+#!/usr/bin/python
+# -*- Coding:utf-8 -*-
+#
+# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details. #
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Radek Novacek <rnovacek@redhat.com>
+# Authors: Michal Minar <miminar@redhat.com>
+"""
+Creation and manipulation utilities with rpm cache for software tests.
+"""
+import copy
+import datetime
+import os
+import pickle
+import re
+from collections import defaultdict
+from subprocess import call, check_output
+
+def make_nevra(name, epoch, ver, rel, arch, with_epoch='NOT_ZERO'):
+ """
+ @param with_epoch may be one of:
+ "NOT_ZERO" - include epoch only if it's not zero
+ "ALWAYS" - include epoch always
+ "NEVER" - do not include epoch at all
+ """
+ estr = ''
+ if with_epoch.lower() == "always":
+ estr = epoch
+ elif with_epoch.lower() == "not_zero":
+ if epoch != "0":
+ estr = epoch
+ if len(estr):
+ estr += ":"
+ return "%s-%s%s-%s.%s" % (name, estr, ver, rel, arch)
+
+class Package(object): #pylint: disable=R0902
+ """
+ Element of test package database. It's a container for package
+ informations. It contains two sets of versions for single package.
+ That's meant for updating tests.
+ """
+ def __init__(self, name, epoch, ver, rel, arch, repo,
+ up_epoch, up_ver, up_rel, up_repo):
+ """
+ Arguments prefixed with 'up_' are for newer package.
+ """
+ self._name = name
+ self._epoch = epoch
+ self._ver = ver
+ self._rel = rel
+ self._arch = arch
+ self._repo = repo
+ self._up_epoch = up_epoch
+ self._up_ver = up_ver
+ self._up_rel = up_rel
+ self._up_repo = up_repo
+
+ @property
+ def name(self): return self._name #pylint: disable=C0111,C0321
+ @property
+ def epoch(self): return self._epoch #pylint: disable=C0111,C0321
+ @property
+ def ver(self): return self._ver #pylint: disable=C0111,C0321
+ @property
+ def rel(self): return self._rel #pylint: disable=C0111,C0321
+ @property
+ def arch(self): return self._arch #pylint: disable=C0111,C0321
+ @property
+ def repo(self): return self._repo #pylint: disable=C0111,C0321
+ @property
+ def nevra(self): #pylint: disable=C0111,C0321
+ return self.get_nevra(True)
+
+ @property
+ def up_epoch(self): return self._up_epoch #pylint: disable=C0111,C0321
+ @property
+ def up_ver(self): return self._up_ver #pylint: disable=C0111,C0321
+ @property
+ def up_rel(self): return self._up_rel #pylint: disable=C0111,C0321
+ @property
+ def up_repo(self): return self._up_repo #pylint: disable=C0111,C0321
+ @property
+ def up_nevra(self): #pylint: disable=C0111,C0321
+ return self.get_nevra(True)
+
+ def get_nevra(self, newer=True, with_epoch='NOT_ZERO'):
+ """
+ @newer if True, evr part is made from properties prefixed with 'up_'
+ @return pkg nevra string
+ """
+ if newer:
+ attrs = ['name', 'up_epoch', 'up_ver', 'up_rel', 'arch']
+ else:
+ attrs = ['name', 'epoch', 'ver', 'rel', 'arch']
+ return make_nevra(*[getattr(self, '_'+a) for a in attrs],
+ with_epoch=with_epoch)
+
+DB_BACKUP_FILE = 'lmi_software_test_cache'
+
+RE_AVAIL_PKG = re.compile(
+ r'^(?P<name>[^\s]+)\.(?P<arch>[a-zA-Z0-9_]+)'
+ r'\s+(?P<epoch>([0-9]+:)?)(?P<version>[a-zA-Z0-9._+-]+)'
+ r'-(?P<release>[a-zA-Z0-9_.]+)\s+'
+ r'(?P<repository>[a-zA-Z0-9_-]+)\s*$', re.MULTILINE)
+# this won't match the last entry, unless "package\n" is not appended
+# at the end of the string
+RE_PKG_DEPS = re.compile(
+ r'^package:\s*(?P<name>[^\s]+)\.(?P<arch>[a-zA-Z0-9_]+)'
+ r'\s+(?P<epoch>([0-9]+:)?)(?P<version>[a-zA-Z0-9._+-]+)'
+ r'-(?P<release>[a-zA-Z0-9_.]+)\s+(?P<dep_list>.*?)'
+ r'(?=^package|\Z)', re.MULTILINE | re.DOTALL)
+RE_DEPS_PROVIDERS = re.compile(
+ r'^\s+provider:\s*(?P<name>[^\s]+)\.(?P<arch>[a-zA-Z0-9_]+)'
+ r'\s+(?P<epoch>([0-9]+:)?)(?P<version>[a-zA-Z0-9._+-]+)'
+ r'-(?P<release>[a-zA-Z0-9_.]+)\s*$', re.IGNORECASE | re.MULTILINE)
+RE_PKG_INFO = re.compile(
+ r'^Name\s*:\s*(?P<name>[^\s]+).*?'
+ r'^(Epoch\s*:\s*(?P<epoch>[0-9]+)\s+)?'
+ r'^Version\s*:\s*(?P<version>[a-zA-Z0-9._+-]+)\s+'
+ r'^Release\s*:\s*(?P<release>[^\s]+)\s+.*?'
+ r'^Size\s*:\s*(?P<size>\d+(\.\d+)?)( *(?P<units>[kMG]))?',
+ re.MULTILINE | re.DOTALL | re.IGNORECASE)
+RE_REPO = re.compile(
+ r'^[^\s]+\b(?!\s+id)', re.MULTILINE | re.IGNORECASE)
+
+# maximum number of packages, that will be selected for testing
+MAX_PKG_DB_SIZE = 3
+# step used to iterate over package names used to check for thery dependencies
+# it's a number of packages, that will be passed to yum command at once
+PKG_DEPS_ITER_STEP = 50
+
+def _match_nevr(match):
+ """
+ @param match is a regexp match object with parsed rpm package
+ @return tuple (name, epoch, version, release)
+ """
+ return ( match.group('name')
+ , match.group('epoch')
+ , match.group('version')
+ , match.group('release'))
+
+def _filter_duplicates(installed, avail_str):
+ """
+ Parse output of "yum list available" command and retuns only those
+ packages occuring in multiple versions.
+ @param installed is a set of installed package names
+ @param avail_str yum command output
+ @return [ [pkg1v1, pkg1v2, ...], [pkg2v1, pkg2v2, ...], ... ]
+ Each sublist of result contain at least 2 elements, that are instances
+ of Package.
+ """
+ m2pkg = lambda m: Package(m.group('name'),
+ m.group('epoch') if m.group('epoch') else '0',
+ m.group('version'), m.group('release'),
+ m.group('arch'), m.group('repository'),
+ None, None, None, None)
+ dups_list = []
+ cur_package_matches = []
+ prev_match = None
+ system_arch = get_system_architecture()
+ for match in RE_AVAIL_PKG.finditer(avail_str):
+ if ( _match_nevr(match) in [ _match_nevr(m)
+ for m in cur_package_matches]
+ or ( ( not prev_match
+ or prev_match.group('name') in
+ [m.group('name') for m in cur_package_matches])
+ and match.group('arch') not in ('noarch', system_arch))):
+ continue
+ if prev_match and prev_match.group('name') != match.group('name'):
+ if ( len(cur_package_matches) > 1
+ and not match.group('name') in installed):
+ pkgs = [ m2pkg(m) for m in cur_package_matches ]
+ dups_list.append(pkgs)
+ cur_package_matches = []
+ cur_package_matches.append(match)
+ prev_match = match
+ if len(cur_package_matches) > 1:
+ dups_list.append([ m2pkg(m) for m in cur_package_matches ])
+ return dups_list
+
+def _check_pkg_dependencies(
+ installed,
+ dup_list,
+ number_of_packages=MAX_PKG_DB_SIZE):
+ """
+ Finds packages from dup_list with satisfied (installed) dependencies.
+ @param installed is a set of installed package names
+ @return filtered dup_list with at least number_of_packages elements.
+ """
+ cmd = ['yum', 'deplist']
+ dups_no_deps = []
+ for i in range(0, len(dup_list), PKG_DEPS_ITER_STEP):
+ dups_part = dup_list[i:i+PKG_DEPS_ITER_STEP]
+ for dups in dups_part:
+ cmd.append(dups[0].name)
+ deplist_str = check_output(cmd)
+ for pkgs, match_pkg in zip(dups_part,
+ RE_PKG_DEPS.finditer(deplist_str)):
+ for match_dep in RE_DEPS_PROVIDERS.finditer(
+ match_pkg.group('dep_list')):
+ if match_dep.group('name') not in installed:
+ break
+ else:
+ dups_no_deps.append(pkgs)
+ if len(dups_no_deps) >= number_of_packages:
+ break
+ return dups_no_deps
+
+def _sorted_db_by_size(pkgdb):
+ """
+ @param pkgdb is a list of lists of packages with common name
+ @return sorted instances of Package according to their size
+ """
+ cmd = ['yum', 'info', '--showduplicates']
+ cmd.extend([ps[0].name for ps in pkgdb])
+ info_str = check_output(cmd)
+ pkg_sizes = {}
+ # to get correct ordering from "yum info" command
+ # { pkg_name : [(epoch, version, release), ... ] }
+ pkg_version_order = defaultdict(list)
+ for info_match in RE_PKG_INFO.finditer(info_str):
+ pkg_name = info_match.group('name')
+ size = float(info_match.group('size'))
+ units = info_match.group('units').lower()
+ if units:
+ size *= defaultdict(lambda: 1,
+ {'k':10**3, 'm':10**6, 'g':10**9})[units]
+ pkg_sizes[pkg_name] = size
+ epoch = info_match.group('epoch')
+ if not epoch:
+ epoch = "0"
+ pkg_version_order[pkg_name].append((
+ epoch, info_match.group('version'), info_match.group('release')))
+ pkgdb = sorted(pkgdb, key=lambda pkgs: pkg_sizes[pkgs[0].name])[
+ :MAX_PKG_DB_SIZE]
+
+ for i, pkgs in enumerate(pkgdb):
+ pkgs = sorted(pkgs, key=lambda p:
+ pkg_version_order[pkgs[0].name].index((p.epoch, p.ver, p.rel)))
+ pkg_kwargs = dict((k, getattr(pkgs[0], k)) for k in ('name', 'arch') )
+ for attr in ('epoch', 'ver', 'rel', 'repo'):
+ pkg_kwargs[attr] = getattr(pkgs[0], attr)
+ pkg_kwargs['up_'+attr] = getattr(pkgs[-1], attr)
+ pkgdb[i] = Package(**pkg_kwargs)
+ return pkgdb
+
+def _get_repo_list():
+ """
+ @return list of software repository names
+ """
+ repos_str = check_output(['yum', 'repolist', '-q'])
+ return RE_REPO.findall(repos_str)
+
+def _download_pkgdb(repolist, pkgdb, cache_dir=None):
+ """
+ Downloads all rpm packages (old and newer versions) from package database
+ to current directory.
+ """
+ repo_pkgs = defaultdict(list)
+ for pkg in pkgdb:
+ repo_pkgs[pkg.repo].append(pkg.name)
+ repo_pkgs[pkg.up_repo].append(pkg.name)
+ base_cmd = ['yumdownloader']
+ if cache_dir:
+ base_cmd.extend(['--destdir', cache_dir])
+ for repo, pkgs in repo_pkgs.items():
+ cmd = copy.copy(base_cmd)
+ repos = set(repolist)
+ repos.remove(repo)
+ for not_allowed_repo in repos:
+ cmd.append('--disablerepo='+not_allowed_repo)
+ cmd.append('--enablerepo='+repo)
+ cmd.extend(pkgs)
+ call(cmd)
+
+def _make_rpm_path(pkg, cache_dir='', newer=True):
+ """
+ @param newer says, whether to use EVR of package to update
+ (in this case, all epoch/ver/rel attributes will be prefixed with "up_")
+ @return path to rpm package made from instance of Package
+ """
+ if not isinstance(pkg, Package):
+ raise TypeError("pkg must be an instance of Package ")
+ return os.path.join(cache_dir, pkg.get_nevra(newer)) + '.rpm'
+
+def get_system_architecture():
+ """
+ @return the system architecture name as seen by rpm
+ """
+ return check_output(['rpm', '-q', '--qf', '%{ARCH}\n', 'rpm'])
+
+def write_pkgdb(pkgdb, cache_dir=''):
+ """
+ Writes package database into a file named DB_BACKUP_FILE.
+ """
+ with open(os.path.join(cache_dir, DB_BACKUP_FILE), 'w') as db_file:
+ pickle.dump((datetime.datetime.now(), pkgdb), db_file)
+
+def load_pkgdb(cache_dir=''):
+ """
+ This is inverse function to _write_pkgdb().
+ @return package database loaded from file
+ """
+ with open(os.path.join(cache_dir, DB_BACKUP_FILE), 'r') as db_file:
+ date_time, pkgdb = pickle.load(db_file)
+ #print "Loaded package database from: %s" % date_time
+ return pkgdb
+
+def get_pkg_database(force_update=False, use_cache=True, cache_dir=''):
+ """
+ Checks yum database for available packages, that have at least two
+ different versions in repositories. Only not installed ones with
+ all of their dependencies intalled are selected.
+ And from those, few of the smallest are downloaded as rpms.
+ @return list of instances of Package of selected packages
+ """
+ if ( use_cache and not force_update
+ and os.path.exists(os.path.join(cache_dir, DB_BACKUP_FILE))):
+ pkgdb = load_pkgdb(cache_dir)
+ valid_db = True
+ for pkg in pkgdb:
+ if ( not os.path.exists(_make_rpm_path(pkg, cache_dir))
+ or not os.path.exists(_make_rpm_path(pkg, cache_dir, True))):
+ valid_db = False
+ #print "Old package database is not valid"
+ break
+ if valid_db:
+ return pkgdb
+ #print "Getting installed packages"
+ installed = set(check_output( #pylint: disable=E1103
+ ['rpm', '-qa', '--qf=%{NAME}\n']).splitlines())
+ #print "Getting all available packages"
+ avail_str = check_output(['yum', 'list', 'available', '--showduplicates'])
+ # list of lists of packages with the same name, longer than 2
+ #print "Finding duplicates"
+ dups_list = _filter_duplicates(installed, avail_str)
+ #print "Selecting only those (from %d) with installed dependencies" % \
+ #len(dups_list)
+ selected = _check_pkg_dependencies(installed, dups_list)
+ #print "Selecting the smallest ones"
+ pkgdb = _sorted_db_by_size(selected)
+ if use_cache:
+ repolist = _get_repo_list()
+ _download_pkgdb(repolist, pkgdb, cache_dir)
+ #print "Backing up database information"
+ write_pkgdb(pkgdb, cache_dir)
+ return pkgdb
+
diff --git a/src/software/test/run_tests.py b/src/software/test/run_tests.py
new file mode 100755
index 0000000..9d290a1
--- /dev/null
+++ b/src/software/test/run_tests.py
@@ -0,0 +1,257 @@
+#!/usr/bin/python
+# -*- Coding:utf-8 -*-
+#
+# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details. #
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Radek Novacek <rnovacek@redhat.com>
+# Authors: Michal Minar <miminar@redhat.com>
+"""
+Runs all defined tests.
+
+Test modules are expected to be in directory with this script and
+should contain subclasses of unittest.TestCase, that will be loaded.
+Preferably a suite() function should be defined there as well.
+They must be named according to shell regexp: "test_*.py".
+"""
+
+import argparse
+import copy
+import getpass
+import inspect
+import pywbem
+import os
+import shutil
+import sys
+import tempfile
+import unittest
+
+import rpmcache
+
+# this is a global variable, that can be modified by environment or program
+# argument
+CACHE_DIR = ''
+
+def parse_cmd_line():
+ """
+ Use ArgumentParser to parse command line arguments.
+ @return (parsed arguments object, arguments for unittest.main())
+ """
+ parser = argparse.ArgumentParser(
+ add_help=False, # handle help message ourselves
+ description="Test OpenLMI Software providers. Arguments"
+ " for unittest main function can be added after \"--\""
+ " switch.")
+ parser.add_argument("--url",
+ default=os.environ.get("LMI_CIMOM_URL", "http://localhost:5988"),
+ help="Network port to use for tests")
+ parser.add_argument("-u", "--user",
+ default=os.environ.get("LMI_CIMOM_USERNAME", ''),
+ help="User for broker authentication.")
+ parser.add_argument("-p", "--password",
+ default=os.environ.get("LMI_CIMOM_PASSWORD", ''),
+ help="User's password.")
+ dangerous_group = parser.add_mutually_exclusive_group()
+ dangerous_group.add_argument("--run-dangerous", action="store_true",
+ default=(os.environ.get('LMI_RUN_DANGEROUS', '0') == '1'),
+ help="Run also tests dangerous for this machine"
+ " (tests, that manipulate with software database)."
+ " Overrides environment variable LMI_RUN_DANGEROUS.")
+ dangerous_group.add_argument('--no-dangerous', action="store_false",
+ dest="run_dangerous",
+ default=os.environ.get('LMI_RUN_DANGEROUS', '0') == '1',
+ help="Disable dangerous tests.")
+ cache_group = parser.add_mutually_exclusive_group()
+ cache_group.add_argument("-c", "--use-cache", action="store_true",
+ default=(os.environ.get('LMI_SOFTWARE_USE_CACHE', '0') == '1'),
+ help="Use a cache directory to download rpm packages for"
+ " testing purposes. It greatly speeds up testing."
+ " Also a database file \"${LMI_SOFTWARE_CACHE_DIR}/%s\""
+ " will be created to store information"
+ " for this system. Overrides environment variable"
+ " LMI_SOFTWARE_USE_CACHE." % rpmcache.DB_BACKUP_FILE)
+ cache_group.add_argument("--no-cache", action="store_false",
+ dest="use_cache",
+ default=(os.environ.get('LMI_SOFTWARE_USE_CACHE', '0') == '1'),
+ help="Do not cache rpm packages for speeding up tests."
+ " Overrides environment variable LMI_SOFTWARE_USE_CACHE.")
+ parser.add_argument('--cache-dir',
+ default=os.environ.get('LMI_SOFTWARE_CACHE_DIR', ''),
+ help="Use particular directory, instead of temporary one, to store"
+ " rpm packages for testing purposes. Overrides nevironment"
+ " variable LMI_SOFTWARE_CACHE_DIR.")
+ parser.add_argument('--force-update', action="store_true",
+ help="Force update of package database. Otherwise an old"
+ " one will be used (if any exists).")
+ parser.add_argument('-l', '--list-tests', action="store_true",
+ help="List all possible test names.")
+ parser.add_argument('-h', '--help', action="store_true",
+ help="Show help message.")
+
+ argv = copy.copy(sys.argv)
+ rest = []
+ if '--' in argv:
+ index = argv.index('--')
+ argv = argv[:index]
+ rest = sys.argv[index + 1:]
+ args, unknown_args = parser.parse_known_args(argv)
+ if args.help:
+ parser.print_help()
+ print
+ print "*"*79
+ print " Unit test options (parsed after \"--\" switch)."
+ print "*"*79
+ print
+ unittest.main(argv=[argv[0], "--help"])
+ return (args, unknown_args + rest)
+
+def try_connection(args):
+ """
+ Try to connect to cim broker. If authentication fails, ask
+ the user for credentials in loop.
+ @return (user, password)
+ """
+ user = args.user
+ password = args.password
+ while False:
+ try:
+ pywbem.WBEMConnection(args.url, (user, password))
+ break
+ except pywbem.cim_http.AuthError as exc:
+ user = args.user
+ sys.stderr.write("Authentication error\n")
+ if exc.args[0] == pywbem.CIM_ERR_ACCESS_DENIED:
+ if not user:
+ user = raw_input("User: ")
+ password = getpass.getpass()
+
+def prepare_environment(args):
+ """
+ Set the environment for test scripts.
+ """
+ os.environ['LMI_CIMOM_URL'] = args.url
+ os.environ['LMI_CIMOM_USERNAME'] = args.user
+ os.environ['LMI_CIMOM_PASSWORD'] = args.password
+ os.environ['LMI_RUN_DANGEROUS'] = (
+ '1' if args.run_dangerous else '0')
+ os.environ['LMI_SOFTWARE_USE_CACHE'] = '1' if args.use_cache else '0'
+ if args.use_cache:
+ os.environ['LMI_SOFTWARE_CACHE_DIR'] = CACHE_DIR
+
+def load_tests(loader, standard_tests, pattern):
+ """
+ Helper function for unittest.main() test loader.
+ @return TestSuite instance
+ """
+ this_dir = os.path.dirname(__file__)
+ if standard_tests:
+ suite = standard_tests
+ else:
+ suite = unittest.TestSuite()
+ discover_kwargs = dict(start_dir=this_dir)
+ if pattern is not None:
+ discover_kwargs['pattern'] = pattern
+ tests = loader.discover(**discover_kwargs)
+ suite.addTests(tests)
+ return suite
+
+class LMITestLoader(unittest.TestLoader):
+ """
+ Customized test loader to make invoking particular tests a lot easier.
+ """
+
+ @staticmethod
+ def find_in_test_nodes(node, name):
+ """
+ Traverses suite tree nodes to find a test named name.
+ @param name is a name of test to find
+ @return desired TestCase or test function
+ """
+ for subnode in node:
+ if isinstance(subnode, unittest.TestSuite):
+ subnode = LMITestLoader.find_in_test_nodes(subnode, name)
+ if subnode is not None:
+ return subnode
+ elif isinstance(subnode, unittest.TestCase):
+ if name == subnode.__class__.__name__:
+ return subnode
+ elif inspect.isfunction(subnode) and name == subnode.__name__:
+ return subnode
+
+ def loadTestsFromName(self, name, module=None): #pylint: disable=C0103
+ """
+ Override of parent method to make test cases accessible to by their
+ names from command line.
+ """
+ suite = load_tests(self, None, None)
+ parts = name.split('.')
+ node = suite
+ for part in parts:
+ node = LMITestLoader.find_in_test_nodes(node, part)
+ if node is None:
+ # name not found
+ return unittest.TestLoader.loadTestsFromName(self, name, module)
+ return node
+
+def unwind_test_suite_tree(node):
+ """
+ Make a list of test names out of TestSuite.
+ @param node is a suite
+ @return [ test_name, ... ]
+ """
+ result = []
+ for subnode in node:
+ if isinstance(subnode, unittest.TestSuite):
+ result.extend(unwind_test_suite_tree(subnode))
+ elif isinstance(subnode, unittest.TestCase):
+ result.append( subnode.__class__.__name__
+ + '.'+subnode._testMethodName)
+ elif inspect.isfunction(subnode):
+ result.append(subnode.__name__)
+ return result
+
+def list_tests():
+ """
+ Prints all available tests to stdout and exits.
+ """
+ suite = load_tests(LMITestLoader(), None, None)
+ test_names = unwind_test_suite_tree(suite)
+ print "\n".join(test_names)
+ sys.exit(0)
+
+def main():
+ """
+ Main functionality of script.
+ """
+ global CACHE_DIR
+ args, ut_args = parse_cmd_line()
+ if args.list_tests:
+ list_tests()
+ if args.use_cache and not args.cache_dir:
+ CACHE_DIR = tempfile.mkdtemp(suffix="software_database")
+ elif args.use_cache:
+ CACHE_DIR = args.cache_dir
+ try_connection(args)
+ rpmcache.get_pkg_database(args.force_update, args.use_cache, CACHE_DIR)
+ prepare_environment(args)
+ test_program = unittest.main(argv=ut_args,
+ testLoader=LMITestLoader(), exit=False)
+ if args.use_cache and not args.cache_dir:
+ shutil.rmtree(CACHE_DIR)
+ sys.exit(test_program.result.wasSuccessful())
+
+if __name__ == '__main__':
+ main()
+
diff --git a/src/software/test/test_software_file_check.py b/src/software/test/test_software_file_check.py
index 0e91bfa..cb09282 100755
--- a/src/software/test/test_software_file_check.py
+++ b/src/software/test/test_software_file_check.py
@@ -18,31 +18,42 @@
#
# Authors: Michal Minar <miminar@redhat.com>
#
+"""
+Unit tests for LMI_SoftwareFileCheck provider.
+"""
-from common import *
import hashlib
-import shutil
-import socket
-import stat
+import os
+import pywbem
+import re
+import subprocess
+import unittest
+from collections import namedtuple
-re_checksum = re.compile(r'^([0-9a-fA-F]+)\s+.*')
+import common
-PassedFlags = namedtuple("PassedFlags",
+RE_CHECKSUM = re.compile(r'^([0-9a-fA-F]+)\s+.*')
+
+PassedFlags = namedtuple("PassedFlags", #pylint: disable=C0103
"exists, type, size, mode, checksum, dev, ltarget, uid, gid, mtime")
-class TestSoftwareInstalledPackage(SoftwareBaseTestCase):
+class TestSoftwareFileCheck(common.SoftwareBaseTestCase):
+ #pylint: disable=R0904
+ """
+ Basic cim operations test.
+ """
CLASS_NAME = "LMI_SoftwareFileCheck"
KEYS = ( "CheckID", "Name", "SoftwareElementID", "SoftwareElementState"
, "TargetOperatingSystem", "Version")
hash_num2algo = {
- 1 : hashlib.md5,
- 2 : hashlib.sha1,
- 8 : hashlib.sha256,
- 9 : hashlib.sha384,
- 10 : hashlib.sha512,
- 11 : hashlib.sha224
+ 1 : hashlib.md5, #pylint: disable=E1101
+ 2 : hashlib.sha1, #pylint: disable=E1101
+ 8 : hashlib.sha256, #pylint: disable=E1101
+ 9 : hashlib.sha384, #pylint: disable=E1101
+ 10 : hashlib.sha512, #pylint: disable=E1101
+ 11 : hashlib.sha224 #pylint: disable=E1101
}
hash_num2cmd = {
@@ -63,25 +74,74 @@ class TestSoftwareInstalledPackage(SoftwareBaseTestCase):
11 : 56
}
- def make_op(self, nevra, filename):
- op = self.op.copy()
- m = re_nevra.match(nevra)
- name, version = [ m.group(k) for k in ("name", "ver") ]
- op["Name"] = filename
- op["Version"] = version
- op["CheckID"] = "%s#%s" % (name, filename)
- op["SoftwareElementState"] = pywbem.Uint16(2)
- op["TargetOperatingSystem"] = pywbem.Uint16(36)
- op["SoftwareElementID"] = nevra
- return op
+ def make_op(self, pkg, filename, newer=True):
+ """
+ @return object path of LMI_SoftwareFileCheck
+ """
+ objpath = self.objpath.copy()
+ objpath["Name"] = filename
+ objpath["Version"] = getattr(pkg, "up_ver" if newer else "ver")
+ objpath["CheckID"] = "%s#%s" % (pkg.name, filename)
+ objpath["SoftwareElementState"] = pywbem.Uint16(2)
+ objpath["TargetOperatingSystem"] = pywbem.Uint16(36)
+ objpath["SoftwareElementID"] = pkg.get_nevra(newer)
+ return objpath
+
+ def assertEqualSEID(self, id1, id2):
+ """
+ This is for comparison of SoftwareElementID property values.
+ """
+ if ( not isinstance(id1, basestring)
+ or not isinstance(id2, basestring)):
+ return common.SoftwareBaseTestCase.assertEqual(self, id1, id2)
+ match1 = common.RE_NEVRA.match(id1)
+ match2 = common.RE_NEVRA.match(id2)
+ if not match1 or not match2:
+ return common.SoftwareBaseTestCase.assertEqual(self, id1, id2)
+ if any( match1.group(g) != match2.group(g)
+ for g in ('name', 'ver', 'rel', 'arch')):
+ return common.SoftwareBaseTestCase.assertEqual(self, id1, id2)
+ epoch1 = match1.group('epoch')
+ epoch2 = match2.group('epoch')
+ if not epoch1:
+ epoch1 = '0'
+ if not epoch2:
+ epoch2 = '0'
+ if epoch1 != epoch2:
+ return common.SoftwareBaseTestCase.assertEqual(self, id1, id2)
+ return True
+
+ def assertEqual(self, op1, op2):
+ """
+ This is override for object paths, that allows some differences
+ (like missing epoch in SoftwareElementID).
+ """
+ if ( not isinstance(op1, pywbem.CIMInstanceName)
+ or not isinstance(op2, pywbem.CIMInstanceName)
+ or op1.classname != op2.classname
+ or op1.namespace != op2.namespace
+ or op1.keybindings.keys() != op2.keybindings.keys()
+ or any(op1[a] != op2[a] for a in
+ ( 'Name', 'Version', 'CheckID', 'SoftwareElementState'
+ , 'TargetOperatingSystem'))):
+ return common.SoftwareBaseTestCase.assertEqual(self, op1, op2)
+ return self.assertEqualSEID(
+ op1['SoftwareElementID'], op2['SoftwareElementID'])
+ return True
def make_checksum_str(self, csumnum, filename):
- return re_checksum.match(subprocess.check_output([
+ """
+ @return checksum of installed file
+ """
+ return RE_CHECKSUM.match(subprocess.check_output([
self.hash_num2cmd[csumnum], filename])).group(1).lower()
- def do_test_symlink(self, op, nevra, f, inst):
- target = os.readlink(f)
- stats = os.lstat(f)
+ def do_test_symlink(self, filepath, inst):
+ """
+ Assert some details about symlink.
+ """
+ target = os.readlink(filepath)
+ stats = os.lstat(filepath)
self.assertEqual(inst["FileType"], pywbem.Uint16(3))
self.assertEqual(inst["FileUserID"], stats.st_uid)
@@ -96,9 +156,9 @@ class TestSoftwareInstalledPackage(SoftwareBaseTestCase):
# modify owner
prev_user = inst["FileUserID"]
- prev_mtime = inst["LastModificationTime"]
- prev_pflags = PassedFlags(*inst["PassedFlags"])
- os.lchown(f, stats.st_uid + 1, -1)
+ #prev_mtime = inst["LastModificationTime"]
+ #prev_pflags = PassedFlags(*inst["PassedFlags"])
+ os.lchown(filepath, stats.st_uid + 1, -1)
inst = self.conn.GetInstance(InstanceName=inst.path)
self.assertEqual(inst["FileUserID"], inst["ExpectedFileUserID"] + 1)
self.assertEqual(inst["FileUserID"], prev_user + 1)
@@ -118,9 +178,10 @@ class TestSoftwareInstalledPackage(SoftwareBaseTestCase):
self.assertTrue(cur_pflags.mtime)
# modify link_target
- os.remove(f)
- os.symlink("wrong" + "*"*len(inst["ExpectedLinkTarget"]), f)
- os.lchown(f, inst["ExpectedFileUserID"], inst["ExpectedFileGroupID"])
+ os.remove(filepath)
+ os.symlink("wrong" + "*"*len(inst["ExpectedLinkTarget"]), filepath)
+ os.lchown(filepath, inst["ExpectedFileUserID"],
+ inst["ExpectedFileGroupID"])
inst = self.conn.GetInstance(InstanceName=inst.path)
cur_pflags = PassedFlags(*inst["PassedFlags"])
@@ -138,8 +199,11 @@ class TestSoftwareInstalledPackage(SoftwareBaseTestCase):
self.assertTrue(cur_pflags.gid)
self.assertTrue(cur_pflags.mtime)
- def do_test_directory(self, op, nevra, f, inst):
- stats = os.lstat(f)
+ def do_test_directory(self, filepath, inst):
+ """
+ Assert some details about directory.
+ """
+ stats = os.lstat(filepath)
self.assertEqual(inst["FileType"], pywbem.Uint16(2))
self.assertEqual(inst["FileUserID"], stats.st_uid)
@@ -152,8 +216,11 @@ class TestSoftwareInstalledPackage(SoftwareBaseTestCase):
self.assertEqual(inst["LastModificationTime"],
int(stats.st_mtime))
- def do_test_file(self, op, nevra, f, inst):
- stats = os.lstat(f)
+ def do_test_file(self, filepath, inst):
+ """
+ Assert some details about regurar file.
+ """
+ stats = os.lstat(filepath)
self.assertEqual(inst["FileType"], pywbem.Uint16(1))
self.assertEqual(inst["FileUserID"], stats.st_uid)
@@ -161,7 +228,7 @@ class TestSoftwareInstalledPackage(SoftwareBaseTestCase):
self.assertEqual(inst["FileMode"], stats.st_mode)
self.assertEqual(inst["FileSize"], stats.st_size)
self.assertIs(inst["LinkTarget"], None)
- csum = self.make_checksum_str(inst['FileChecksumType'], f)
+ csum = self.make_checksum_str(inst['FileChecksumType'], filepath)
self.assertEqual(inst["FileChecksum"].lower(), csum)
self.assertEqual(inst["ExpectedLastModificationTime"],
inst["LastModificationTime"])
@@ -169,7 +236,7 @@ class TestSoftwareInstalledPackage(SoftwareBaseTestCase):
int(stats.st_mtime))
# make it longer
- with open(f, "a+") as fobj:
+ with open(filepath, "a+") as fobj:
fobj.write("data\n")
inst = self.conn.GetInstance(InstanceName=inst.path)
cur_pflags = PassedFlags(*inst["PassedFlags"])
@@ -189,9 +256,10 @@ class TestSoftwareInstalledPackage(SoftwareBaseTestCase):
self.assertFalse(cur_pflags.mtime)
# change file type
- os.remove(f)
- os.symlink(f, f)
- os.lchown(f, inst["ExpectedFileUserID"], inst["ExpectedFileGroupID"])
+ os.remove(filepath)
+ os.symlink(filepath, filepath)
+ os.lchown(filepath, inst["ExpectedFileUserID"],
+ inst["ExpectedFileGroupID"])
inst = self.conn.GetInstance(InstanceName=inst.path)
cur_pflags = PassedFlags(*inst["PassedFlags"])
self.assertNotEqual(inst["LinkTarget"], inst["ExpectedLinkTarget"])
@@ -213,7 +281,7 @@ class TestSoftwareInstalledPackage(SoftwareBaseTestCase):
self.assertFalse(cur_pflags.mtime)
# remove it
- os.remove(f)
+ os.remove(filepath)
inst = self.conn.GetInstance(InstanceName=inst.path)
cur_pflags = PassedFlags(*inst["PassedFlags"])
self.assertEqual(inst["LinkTarget"], inst["ExpectedLinkTarget"])
@@ -236,73 +304,82 @@ class TestSoftwareInstalledPackage(SoftwareBaseTestCase):
self.assertFalse(cur_pflags.gid)
self.assertFalse(cur_pflags.mtime)
+ @common.mark_dangerous
def test_get_instance(self):
- for nevra, files in pkg_files:
- m = re_nevra.match(nevra)
- name, version = [ m.group(k) for k in ("name", "ver") ]
- if is_installed(nevra=nevra) and not verify_pkg(name):
- remove_pkg(name)
- if not is_installed(name):
- install_pkg(nevra=nevra)
- self.assertTrue(is_installed(nevra=nevra))
- for f in files:
- op = self.make_op(nevra, f)
-
- inst = self.conn.GetInstance(InstanceName=op, LocalOnly=False)
+ """
+ Tests GetInstance call.
+ """
+ for pkg in self.pkgdb:
+ files = self.pkg_files[pkg.name]
+ if ( common.is_installed(pkg)
+ and not common.verify_pkg(pkg.name)):
+ common.remove_pkg(pkg.name)
+ if not common.is_installed(pkg.name):
+ common.install_pkg(pkg)
+ self.assertTrue(common.is_installed(pkg))
+ for filepath in files:
+ objpath = self.make_op(pkg, filepath)
+
+ inst = self.conn.GetInstance(
+ InstanceName=objpath,
+ LocalOnly=False)
self.assertIsInstance(inst, pywbem.CIMInstance)
- self.assertEqual(inst.path, op)
+ self.assertEqual(inst.path, objpath)
for key in self.KEYS:
- self.assertEqual(inst[key], op[key])
+ if key.lower() == "softwareelementid":
+ self.assertEqualSEID(inst[key], objpath[key])
+ else:
+ self.assertEqual(inst[key], objpath[key])
self.assertTrue(inst["FileExists"])
self.assertEqual(len(inst["PassedFlags"]), 10)
- self.assertTrue(all(f is True for f in inst["PassedFlags"]))
+ self.assertTrue(all( filepath is True
+ for filepath in inst["PassedFlags"]))
for prop in ( "FileType", "FileUserID", "FileGroupID"
, "FileMode", "FileSize", "LinkTarget"
, "FileChecksum", "FileModeFlags"):
self.assertEqual(inst["Expected"+prop], inst[prop])
- if os.path.islink(f):
- self.do_test_symlink(op, nevra, f, inst)
- elif os.path.isdir(f):
- self.do_test_directory(op, nevra, f, inst)
- elif os.path.isfile(f):
- self.do_test_file(op, nevra, f, inst)
-
+ if os.path.islink(filepath):
+ self.do_test_symlink(filepath, inst)
+ elif os.path.isdir(filepath):
+ self.do_test_directory(filepath, inst)
+ elif os.path.isfile(filepath):
+ self.do_test_file(filepath, inst)
+
+ @common.mark_dangerous
def test_invoke_method(self):
- for nevra, files in pkg_files:
- m = re_nevra.match(nevra)
- name, version = [ m.group(k) for k in ("name", "ver") ]
- if is_installed(nevra=nevra) and not verify_pkg(name):
- remove_pkg(name)
- if not is_installed(name):
- install_pkg(nevra=nevra)
- self.assertTrue(is_installed(nevra=nevra))
- for f in files:
- op = self.make_op(nevra, f)
-
- (rval, oparms) = self.conn.InvokeMethod(
+ """
+ Tests Invoke method invocation.
+ """
+ for pkg in self.pkgdb:
+ files = self.pkg_files[pkg.name]
+ if common.is_installed(pkg) and not common.verify_pkg(pkg.name):
+ common.remove_pkg(pkg.name)
+ if not common.is_installed(pkg.name):
+ common.install_pkg(pkg)
+ self.assertTrue(common.is_installed(pkg))
+ for filepath in files:
+ objpath = self.make_op(pkg, filepath)
+
+ (rval, _) = self.conn.InvokeMethod(
MethodName="Invoke",
- ObjectName=op)
+ ObjectName=objpath)
self.assertEqual(rval, pywbem.Uint32(0))
# modify file
- if os.path.isfile(f):
- os.remove(f)
+ if os.path.isfile(filepath):
+ os.remove(filepath)
else:
- os.lchown(f, os.stat(f).st_uid + 1, -1)
- (rval, oparms) = self.conn.InvokeMethod(
+ os.lchown(filepath, os.stat(filepath).st_uid + 1, -1)
+ (rval, _) = self.conn.InvokeMethod(
MethodName="Invoke",
- ObjectName=op)
+ ObjectName=objpath)
self.assertEqual(rval, pywbem.Uint32(2))
- @classmethod
- def tearDownClass(cls):
- for pkg_nevra, files in pkg_files:
- name = re_nevra.match(pkg_nevra).group('name')
- if is_installed(name) and not verify_pkg(name):
- remove_pkg(name)
- if not is_installed(name):
- install_pkg(nevra=pkg_nevra)
+def suite():
+ """For unittest loaders."""
+ return unittest.TestLoader().loadTestsFromTestCase(
+ TestSoftwareFileCheck)
if __name__ == "__main__":
unittest.main()
diff --git a/src/software/test/test_software_installed_package.py b/src/software/test/test_software_installed_package.py
index 54b8f35..c6e64de 100755
--- a/src/software/test/test_software_installed_package.py
+++ b/src/software/test/test_software_installed_package.py
@@ -18,198 +18,240 @@
#
# Authors: Michal Minar <miminar@redhat.com>
#
+"""
+Unit tests for LMI_SoftwareInstalledPackage provider.
+"""
-from common import *
+import os
+import pywbem
import shutil
import socket
import stat
+import unittest
-class TestSoftwareInstalledPackage(SoftwareBaseTestCase):
+import common
+import rpmcache
+
+class TestSoftwareInstalledPackage(common.SoftwareBaseTestCase):
+ """
+ Basic cim operations test.
+ """
CLASS_NAME = "LMI_SoftwareInstalledPackage"
KEYS = ("Software", "System")
- def make_op(self, name, epoch, ver, rel, arch, ses=2):
- op = self.op.copy()
+ def make_op(self, pkg, newer=True, ses=2):
+ """
+ @param ses SoftwareElementState property value
+ @return object path of LMI_SoftwareInstaledPackage
+ """
+ objpath = self.objpath.copy()
pkg_op = pywbem.CIMInstanceName(
classname="LMI_SoftwarePackage", namespace="root/cimv2",
keybindings={
- "Name" : name,
- "SoftwareElementID" : make_nevra(
- name, epoch, ver, rel, arch, "ALWAYS"),
+ "Name" : pkg.name,
+ "SoftwareElementID" : pkg.get_nevra(newer, 'ALWAYS'),
"SoftwareElementState" : pywbem.Uint16(ses),
"TargetOperatingSystem" : pywbem.Uint16(36),
- "Version" : ver })
+ "Version" : getattr(pkg, 'up_ver' if newer else 'ver') })
system_op = pywbem.CIMInstanceName(
classname="Linux_ComputerSystem", namespace="root/cimv2",
keybindings={
"CreationClassName" : "Linux_ComputerSystem",
"Name" : socket.gethostname() })
- op["Software"] = pkg_op
- op["System"] = system_op
- return op
+ objpath["Software"] = pkg_op
+ objpath["System"] = system_op
+ return objpath
+ @common.mark_dangerous
def test_get_instance(self):
- for pkg in packages:
- if not is_installed(*pkg[:5]):
- install_pkg(*pkg[:5])
- op = self.make_op(*pkg[:5])
- inst = self.conn.GetInstance(InstanceName=op, LocalOnly=False)
+ """
+ Tests GetInstance call on packages from our rpm cache.
+ TODO: test this in non-dangerous way
+ """
+ for pkg in self.pkgdb:
+ if not common.is_installed(pkg):
+ common.install_pkg(pkg)
+ objpath = self.make_op(pkg)
+ inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False)
self.assertIsSubclass(inst.path.classname, self.CLASS_NAME)
self.assertIsSubclass(
- inst.path['System'].classname, op['System'].classname)
- self.assertIsSubclass(
- inst.path['Software'].classname, op['Software'].classname)
+ inst.path['System'].classname, objpath['System'].classname)
+ self.assertIsSubclass(inst.path['Software'].classname,
+ objpath['Software'].classname)
self.assertEqual(len(inst.path.keys()), 2)
for key in self.KEYS:
self.assertEqual(inst[key], inst.path[key])
- self.assertEqual(inst['Software'], op['Software'])
- remove_pkg(pkg.name)
- op['Software']['SoftwareElementState'] = pywbem.Uint16(1)
- with self.assertRaises(pywbem.CIMError) as cm:
- self.conn.GetInstance(InstanceName=op, LocalOnly=False)
- self.assertEqual(cm.exception.args[0], pywbem.CIM_ERR_NOT_FOUND)
+ self.assertEqual(inst['Software'], objpath['Software'])
+ common.remove_pkg(pkg.name)
+ objpath['Software']['SoftwareElementState'] = pywbem.Uint16(1)
+ with self.assertRaises(pywbem.CIMError) as cmngr:
+ self.conn.GetInstance(InstanceName=objpath, LocalOnly=False)
+ self.assertEqual(cmngr.exception.args[0], pywbem.CIM_ERR_NOT_FOUND)
+ @common.mark_dangerous
def test_enum_instances(self):
- pkg = packages[0]
- if is_installed(*pkg[:5]):
- remove_pkg(pkg.name)
+ """
+ Tests EnumInstances call.
+ TODO: test this in non-dangerous way
+ """
+ pkg = self.pkgdb[0]
+ if common.is_installed(pkg.name):
+ common.remove_pkg(pkg.name)
insts1 = self.conn.EnumerateInstanceNames(ClassName=self.CLASS_NAME)
- install_pkg(*pkg[:5])
+ common.install_pkg(pkg)
insts2 = self.conn.EnumerateInstanceNames(ClassName=self.CLASS_NAME)
self.assertEqual(len(insts1) + 1, len(insts2))
- op = self.make_op(*pkg[:5])
- self.assertIn(op['Software'], (inst['Software'] for inst in insts2))
+ objpath = self.make_op(pkg)
+ self.assertIn(objpath['Software'],
+ (inst['Software'] for inst in insts2))
self.assertTrue(all(isinstance(inst, pywbem.CIMInstanceName)
for inst in insts1))
insts1 = self.conn.EnumerateInstances(ClassName=self.CLASS_NAME)
- remove_pkg(pkg.name)
+ common.remove_pkg(pkg.name)
insts2 = self.conn.EnumerateInstances(ClassName=self.CLASS_NAME)
self.assertEqual(len(insts2) + 1, len(insts1))
- self.assertIn(op['Software'], (inst['Software'] for inst in insts1))
+ self.assertIn(objpath['Software'],
+ (inst['Software'] for inst in insts1))
self.assertTrue(all(inst['Software'] == inst.path['Software']
for inst in insts1))
self.assertTrue(all(inst['System'] == inst.path['System']
for inst in insts1))
+ @common.mark_dangerous
def test_create_instance(self):
- for pkg in packages:
- if is_installed(pkg.name):
- remove_pkg(pkg.name)
- self.assertFalse(is_installed(pkg.name))
- op = self.make_op(*(list(pkg[:5]) + [1]))
- inst = pywbem.CIMInstance(classname=op.classname, path=op)
- inst["Software"] = op["Software"]
- inst["System"] = op["System"]
+ """
+ Tests CreateInstance call.
+ """
+ for pkg in self.pkgdb:
+ if common.is_installed(pkg.name):
+ common.remove_pkg(pkg.name)
+ self.assertFalse(common.is_installed(pkg.name))
+ objpath = self.make_op(pkg, ses=1)
+ inst = pywbem.CIMInstance(classname=objpath.classname, path=objpath)
+ inst["Software"] = objpath["Software"]
+ inst["System"] = objpath["System"]
iname = self.conn.CreateInstance(NewInstance=inst)
self.assertIsInstance(iname, pywbem.CIMInstanceName)
- op["Software"]["SoftwareElementState"] = pywbem.Uint16(2)
- self.assertEqual(iname["Software"], op["Software"])
+ objpath["Software"]["SoftwareElementState"] = pywbem.Uint16(2)
+ self.assertEqual(iname["Software"], objpath["Software"])
self.assertIsInstance(iname["System"], pywbem.CIMInstanceName)
self.assertIsSubclass(iname["System"].classname,
- op["System"].classname)
- self.assertEqual(set(iname.keys()), set(op.keys()))
- self.assertTrue(is_installed(pkg.name))
+ objpath["System"].classname)
+ self.assertEqual(set(iname.keys()), set(objpath.keys()))
+ self.assertTrue(common.is_installed(pkg.name))
- with self.assertRaises(pywbem.CIMError) as cm:
+ with self.assertRaises(pywbem.CIMError) as cmngr:
self.conn.CreateInstance(NewInstance=inst)
- self.assertEqual(cm.exception.args[0],
+ self.assertEqual(cmngr.exception.args[0],
pywbem.CIM_ERR_ALREADY_EXISTS)
-
+
+ @common.mark_dangerous
def test_delete_instance(self):
- for pkg in packages:
- if not is_installed(pkg.name):
- install_pkg(*pkg[:5])
- self.assertTrue(is_installed(pkg.name))
- op = self.make_op(*pkg[:5])
- self.conn.DeleteInstance(op)
- self.assertFalse(is_installed(pkg.name))
- with self.assertRaises(pywbem.CIMError) as cm:
- self.conn.DeleteInstance(op)
- self.assertEqual(cm.exception.args[0], pywbem.CIM_ERR_NOT_FOUND)
+ """
+ Tests DeleteInstance call.
+ """
+ for pkg in self.pkgdb:
+ if not common.is_installed(pkg.name):
+ common.install_pkg(pkg)
+ self.assertTrue(common.is_installed(pkg.name))
+ objpath = self.make_op(pkg)
+ self.conn.DeleteInstance(objpath)
+ self.assertFalse(common.is_installed(pkg.name))
+ with self.assertRaises(pywbem.CIMError) as cmngr:
+ self.conn.DeleteInstance(objpath)
+ self.assertEqual(cmngr.exception.args[0], pywbem.CIM_ERR_NOT_FOUND)
+ @common.mark_dangerous
def test_check_integrity(self):
- for pkg_nevra, files in pkg_files:
- m = re_nevra.match(pkg_nevra)
- name, epoch, ver, rel, arch = [ m.group(k) for k in (
- "name", "epoch", "ver", "rel", "arch") ]
- if ( (is_installed(nevra=pkg_nevra) and not verify_pkg(name))
- or (is_installed(name) and not is_installed(nevra=pkg_nevra))) :
- remove_pkg(name)
- if not is_installed(name):
- install_pkg(name, epoch, ver, rel, arch)
- self.assertTrue(is_installed(nevra=pkg_nevra))
+ """
+ Tests CheckIntegrity call.
+ TODO: test this in non-dangerous way
+ """
+ for pkg in self.pkgdb:
+ files = self.pkg_files[pkg.name]
+ if ( ( common.is_installed(pkg)
+ and not common.verify_pkg(pkg.name))
+ or ( common.is_installed(pkg.name)
+ and not common.is_installed(pkg))) :
+ common.remove_pkg(pkg.name)
+ if not common.is_installed(pkg.name):
+ common.install_pkg(pkg)
+ self.assertTrue(common.is_installed(pkg))
- op = self.make_op(name, epoch, ver, rel, arch)
+ objpath = self.make_op(pkg)
(rval, oparms) = self.conn.InvokeMethod(
MethodName="CheckIntegrity",
- ObjectName=op)
+ ObjectName=objpath)
self.assertEqual(rval, pywbem.Uint32(0)) # check passed
self.assertEqual(len(oparms), 1)
self.assertIn("Failed", oparms)
self.assertEqual(len(oparms["Failed"]), 0)
cnt_bad = 0
- for f in files:
- stats = os.lstat(f)
- if os.path.islink(f): # modify symbolic link
- target = os.readlink(f)
- os.remove(f)
- os.symlink(target, f) # just touch symlink
+ for file_path in files:
+ stats = os.lstat(file_path)
+ if os.path.islink(file_path): # modify symbolic link
+ target = os.readlink(file_path)
+ os.remove(file_path)
+ os.symlink(target, file_path) # just touch symlink
(rval, oparms) = self.conn.InvokeMethod(
MethodName="CheckIntegrity",
- ObjectName=op)
+ ObjectName=objpath)
# symlink must pass
self.assertEqual(len(oparms["Failed"]), cnt_bad)
- os.remove(f)
+ os.remove(file_path)
# now change target
- os.symlink("wrong_link_target", f)
- elif os.path.isdir(f): # check directory
- os.chmod(f, stats.st_mode) # just touch dir
+ os.symlink("wrong_link_target", file_path)
+ elif os.path.isdir(file_path): # check directory
+ os.chmod(file_path, stats.st_mode) # just touch dir
(rval, oparms) = self.conn.InvokeMethod(
MethodName="CheckIntegrity",
- ObjectName=op)
+ ObjectName=objpath)
# dir must pass
self.assertEqual(len(oparms["Failed"]), cnt_bad)
# modify read access of directory
- os.chmod(f, stats.st_mode ^ stat.S_IROTH)
+ os.chmod(file_path, stats.st_mode ^ stat.S_IROTH)
else: # modify regular file
# just touch file - this is enough to make it fail
- with open(f, "w+") as fobj: pass
+ with open(file_path, "w+"):
+ pass
cnt_bad += 1
(rval, oparms) = self.conn.InvokeMethod(
MethodName="CheckIntegrity",
- ObjectName=op)
+ ObjectName=objpath)
self.assertEqual(rval, pywbem.Uint32(1))
self.assertEqual(len(oparms), 1)
self.assertIn("Failed", oparms)
self.assertEqual(len(oparms["Failed"]), cnt_bad)
- self.assertIn(f, (p["Name"] for p in oparms["Failed"]))
+ self.assertIn(file_path, (p["Name"] for p in oparms["Failed"]))
+ @common.mark_dangerous
def test_method_update(self):
- for pkg in packages:
- if not pkg.updatable: continue
- if is_installed(pkg.name) and not is_installed(*pkg[:5]):
- remove_pkg(pkg.name)
- if not is_installed(pkg.name):
- install_pkg(*pkg[:5])
- self.assertTrue(is_installed(*pkg[:5]))
-
- op = self.make_op(*pkg[:5])
- op_up = self.make_op(pkg.name, pkg.up_epoch,
- pkg.up_ver, pkg.up_rel, pkg.arch)
+ """
+ Tests Update method invocation.
+ """
+ for pkg in self.pkgdb:
+ if ( common.is_installed(pkg.name)
+ and not common.is_installed(pkg, False)):
+ common.remove_pkg(pkg.name)
+ if not common.is_installed(pkg.name):
+ common.install_pkg(pkg, False)
+ self.assertTrue(common.is_installed(pkg, False))
+
+ objpath = self.make_op(pkg, False)
+ op_up = self.make_op(pkg)
(rval, oparms) = self.conn.InvokeMethod(
MethodName="Update",
- ObjectName=op)
+ ObjectName=objpath)
self.assertEqual(rval, pywbem.Uint16(1))
self.assertEqual(len(oparms), 1)
self.assertIn("Installed", oparms)
self.assertEqual(oparms["Installed"], op_up["Software"])
- self.assertTrue(is_installed(pkg.name, pkg.up_epoch,
- pkg.up_ver, pkg.up_rel, pkg.arch))
+ self.assertTrue(common.is_installed(pkg))
(rval, oparms) = self.conn.InvokeMethod(
MethodName="Update",
@@ -217,44 +259,42 @@ class TestSoftwareInstalledPackage(SoftwareBaseTestCase):
self.assertEqual(rval, pywbem.Uint16(0))
self.assertEqual(len(oparms), 1)
self.assertEqual(oparms["Installed"], op_up["Software"])
+ self.assertTrue(common.is_installed(pkg.name))
+ self.assertFalse(common.is_installed(pkg, False))
- with self.assertRaises(pywbem.CIMError) as cm:
+ with self.assertRaises(pywbem.CIMError) as cmngr:
self.conn.InvokeMethod(
MethodName="Update",
- ObjectName=op)
- self.assertEqual(cm.exception.args[0],
+ ObjectName=objpath)
+ self.assertEqual(cmngr.exception.args[0],
pywbem.CIM_ERR_NOT_FOUND)
- remove_pkg(pkg.name)
- install_pkg(*pkg[:5])
- self.assertTrue(is_installed(*pkg[:5]))
+ common.remove_pkg(pkg.name)
+ common.install_pkg(pkg, False)
+ self.assertTrue(common.is_installed(pkg, False))
(rval, oparms) = self.conn.InvokeMethod(
MethodName="Update",
- ObjectName=op,
+ ObjectName=objpath,
Epoch=pkg.epoch,
Version=pkg.ver,
Release=pkg.rel)
self.assertEqual(rval, pywbem.Uint16(0))
- self.assertEqual(oparms["Installed"], op["Software"])
-
+ self.assertEqual(oparms["Installed"], objpath["Software"])
+
(rval, oparms) = self.conn.InvokeMethod(
MethodName="Update",
- ObjectName=op,
+ ObjectName=objpath,
Epoch=pkg.up_epoch,
Version=pkg.up_ver,
Release=pkg.up_rel)
self.assertEqual(rval, pywbem.Uint16(1))
self.assertEqual(oparms["Installed"], op_up["Software"])
- @classmethod
- def tearDownClass(cls):
- for pkg_nevra, files in pkg_files:
- name = re_nevra.match(pkg_nevra).group('name')
- if is_installed(name) and not verify_pkg(name):
- remove_pkg(name)
- if not is_installed(name):
- install_pkg(nevra=pkg_nevra)
+def suite():
+ """For unittest loaders."""
+ return unittest.TestLoader().loadTestsFromTestCase(
+ TestSoftwareInstalledPackage)
if __name__ == "__main__":
unittest.main()
diff --git a/src/software/test/test_software_package.py b/src/software/test/test_software_package.py
index f6fdf9c..d0144ca 100755
--- a/src/software/test/test_software_package.py
+++ b/src/software/test/test_software_package.py
@@ -18,91 +18,120 @@
#
# Authors: Michal Minar <miminar@redhat.com>
#
+"""
+Unit tests for LMI_SoftwareInstalledPackage provider.
+"""
-from common import *
+import pywbem
+import unittest
-class TestSoftwarePackage(SoftwareBaseTestCase):
+import common
+import rpmcache
+
+class TestSoftwarePackage(common.SoftwareBaseTestCase): #pylint: disable=R0904
+ """
+ Basic cim operations test.
+ """
CLASS_NAME = "LMI_SoftwarePackage"
KEYS = ( "Name", "SoftwareElementID", "SoftwareElementState"
, "TargetOperatingSystem", "Version")
- def make_op(self, name, epoch, ver, rel, arch, ses=2):
- op = self.op.copy()
- op["Name"] = name
- op["SoftwareElementID"] = make_nevra(
- name, epoch, ver, rel, arch, "ALWAYS")
- op["SoftwareElementState"] = pywbem.Uint16(ses)
- op["TargetOperatingSystem"] = pywbem.Uint16(36)
- op["Version"] = ver
- return op
+ def make_op(self, pkg, newer=True, ses=2):
+ """
+ @param ses SoftwareElementState property value
+ @return object path of SoftwarePackage
+ """
+ objpath = self.objpath.copy()
+ objpath["Name"] = pkg.name
+ objpath["SoftwareElementID"] = pkg.get_nevra(newer, "ALWAYS")
+ objpath["SoftwareElementState"] = pywbem.Uint16(ses)
+ objpath["TargetOperatingSystem"] = pywbem.Uint16(36)
+ objpath["Version"] = getattr(pkg, 'up_ver' if newer else 'ver')
+ return objpath
+ @common.mark_dangerous
def test_get_instance(self):
- for pkg in packages:
- if is_installed(pkg.name):
- remove_pkg(pkg.name)
- op = self.make_op(pkg.name, pkg.epoch, pkg.ver,
- pkg.rel, pkg.arch, 1)
- inst = self.conn.GetInstance(InstanceName=op, LocalOnly=False)
- self.assertEqual(inst.path, op)
+ """
+ Tests GetInstance call on packages from our rpm cache.
+ TODO: test this in non-dangerous way
+ """
+ for pkg in self.pkgdb:
+ if common.is_installed(pkg.name):
+ common.remove_pkg(pkg.name)
+ objpath = self.make_op(pkg, ses=1)
+ inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False)
+ self.assertEqual(inst.path, objpath)
for key in self.KEYS:
self.assertTrue(inst.properties.has_key(key))
self.assertEqual(inst.path[key], inst[key])
- self.assertEqual(inst['Release'], pkg.rel)
- install_pkg(*pkg[:5])
- op['SoftwareElementState'] = pywbem.Uint16(2)
- inst = self.conn.GetInstance(InstanceName=op, LocalOnly=False)
- self.assertEqual(inst.path, op)
+ self.assertEqual(inst['Release'], pkg.up_rel)
+ common.install_pkg(pkg)
+ objpath['SoftwareElementState'] = pywbem.Uint16(2)
+ inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False)
+ self.assertEqual(inst.path, objpath)
# try to leave out epoch part
if pkg.epoch == "0":
- op_no_epoch = op.copy()
- op_no_epoch["SoftwareElementID"] = make_nevra(pkg.name, pkg.epoch,
- pkg.ver, pkg.rel, pkg.arch, "NEVER")
+ op_no_epoch = objpath.copy()
+ op_no_epoch["SoftwareElementID"] = rpmcache.make_nevra(pkg.name,
+ pkg.up_epoch, pkg.up_ver, pkg.up_rel, pkg.arch, "NEVER")
self.assertNotIn(":", op_no_epoch["SoftwareElementID"])
inst = self.conn.GetInstance(
InstanceName=op_no_epoch, LocalOnly=False)
- self.assertIn(inst.path, (op, op_no_epoch))
+ self.assertIn(inst.path, (objpath, op_no_epoch))
+ @common.mark_dangerous
def test_method_install(self):
- for pkg in packages:
- if is_installed(pkg.name):
- remove_pkg(pkg.name)
- op = self.make_op(pkg.name, pkg.epoch, pkg.ver,
- pkg.rel, pkg.arch, 1)
+ """
+ Tests Install method invocation.
+ """
+ for pkg in self.pkgdb:
+ if common.is_installed(pkg.name):
+ common.remove_pkg(pkg.name)
+ objpath = self.make_op(pkg, ses=1)
(rval, oparms) = self.conn.InvokeMethod(
MethodName="Install",
- ObjectName=op)
+ ObjectName=objpath)
self.assertEqual(rval, pywbem.Uint32(1))
self.assertEqual(len(oparms), 1)
self.assertTrue(oparms.has_key('Installed'))
- op['SoftwareElementState'] = pywbem.Uint16(2)
- self.assertEqual(oparms['Installed'], op)
- self.assertTrue(is_installed(pkg.name))
+ objpath['SoftwareElementState'] = pywbem.Uint16(2)
+ self.assertEqual(oparms['Installed'], objpath)
+ self.assertTrue(common.is_installed(pkg.name))
(rval, oparms) = self.conn.InvokeMethod(
MethodName="Install",
- ObjectName=op)
+ ObjectName=objpath)
self.assertEqual(rval, pywbem.Uint32(0))
self.assertEqual(len(oparms), 1)
- self.assertEqual(oparms['Installed'], op)
+ self.assertEqual(oparms['Installed'], objpath)
+ @common.mark_dangerous
def test_method_remove(self):
- for pkg in packages:
- if not is_installed(pkg.name):
- install_pkg(*pkg[:5])
- op = self.make_op(*pkg[:5])
+ """
+ Tests Remove method invocation.
+ """
+ for pkg in self.pkgdb:
+ if not common.is_installed(pkg.name):
+ common.install_pkg(pkg)
+ objpath = self.make_op(pkg)
(rval, oparms) = self.conn.InvokeMethod(
MethodName="Remove",
- ObjectName=op)
+ ObjectName=objpath)
self.assertEqual(rval, pywbem.Uint16(1))
self.assertEqual(len(oparms), 0)
- self.assertFalse(is_installed(pkg.name))
- op["SoftwareElementState"] = pywbem.Uint16(1)
+ self.assertFalse(common.is_installed(pkg.name))
+ objpath["SoftwareElementState"] = pywbem.Uint16(1)
(rval, oparms) = self.conn.InvokeMethod(
MethodName="Remove",
- ObjectName=op)
+ ObjectName=objpath)
self.assertEqual(rval, pywbem.Uint32(0))
self.assertEqual(len(oparms), 0)
+def suite():
+ """For unittest loaders."""
+ return unittest.TestLoader().loadTestsFromTestCase(
+ TestSoftwarePackage)
+
if __name__ == '__main__':
unittest.main()