summaryrefslogtreecommitdiffstats
path: root/src/software/test
diff options
context:
space:
mode:
Diffstat (limited to 'src/software/test')
-rw-r--r--src/software/test/base.py (renamed from src/software/test/common.py)153
-rw-r--r--src/software/test/package.py117
-rw-r--r--src/software/test/rpmcache.py422
-rwxr-xr-xsrc/software/test/run.py23
-rwxr-xr-xsrc/software/test/test_hosted_software_collection.py155
-rwxr-xr-xsrc/software/test/test_member_of_software_collection.py155
-rwxr-xr-xsrc/software/test/test_software_file_check.py527
-rwxr-xr-xsrc/software/test/test_software_identity.py154
-rwxr-xr-xsrc/software/test/test_software_installed_package.py351
-rwxr-xr-xsrc/software/test/test_software_package.py153
-rwxr-xr-xsrc/software/test/test_system_software_collection.py86
-rw-r--r--src/software/test/util.py82
12 files changed, 1070 insertions, 1308 deletions
diff --git a/src/software/test/common.py b/src/software/test/base.py
index d91f236..4e56f46 100644
--- a/src/software/test/common.py
+++ b/src/software/test/base.py
@@ -21,61 +21,15 @@
Common utilities and base class for all software tests.
"""
+import itertools
import os
import pywbem
-import re
import tempfile
import unittest
-from subprocess import call, check_output
+from subprocess import check_output
import rpmcache
-RE_NEVRA = re.compile(r'^(?P<name>.+)-(?P<evra>((?P<epoch>\d+):)?(?P<ver>[^-]+)'
- r'-(?P<rel>.+)\.(?P<arch>[^.]+))$')
-
-def remove_pkg(pkg, *args):
- """
- Remove package with rpm command.
- @param pkg is either instance of Package or package name
- @param args is a list of parameters for rpm command
- """
- if isinstance(pkg, rpmcache.Package):
- pkg = pkg.name
- call(["rpm", "--quiet"] + list(args) + ["-e", pkg])
-
-def install_pkg(pkg, newer=True, repolist=[]):
- """
- Install a specific package.
- @param pkg is either package name or instance of Package
- In latter case, a specific version is installed.
- @param repolist is a list of repositories, that should be
- used for downloading, if using yum
- when empty, all enabled repositories are used
- """
- if isinstance(pkg, rpmcache.Package):
- try:
- rpm_name = rpmcache.get_rpm_name(pkg, newer=newer)
- call(["rpm", "--quiet", "-i", rpm_name])
- return
- except rpmcache.MissingRPM:
- pass
- pkg = pkg.name
- rpmcache.run_yum('-q', '-y', 'install', pkg, repolist=repolist)
-
-def is_installed(pkg, newer=True):
- """
- Check, whether package is installed.
- Accepts the same parameters as install_pkg.
- @see install_pkg
- """
- return rpmcache.is_installed(pkg, newer)
-
-def verify_pkg(name):
- """
- @return output of command rpm, with verification output for package
- """
- return call(["rpm", "--quiet", "-Va", name]) == 0
-
def mark_dangerous(method):
"""
Decorator for methods of unittest.TestCase subclasses, that
@@ -86,21 +40,17 @@ def mark_dangerous(method):
else:
return unittest.skip("This test is marked as dangerous.")(method)
-def is_config_file(pkg, file_path):
- """
- @return True, if file_path is a configuration file of package pkg.
- """
- cmd = ['rpm', '-qc', pkg.name]
- out = check_output(cmd)
- return file_path in set(out.splitlines()) #pylint: disable=E1103
-
-def is_doc_file(pkg, file_path):
+def mark_tedious(method):
"""
- @return True, if file_path is a documentation file of package pkg.
+ Decorator for methods of unittest.TestCase subclasses, that
+ skips tedious tests. Those running for very long time and usually
+ need a lot of memory. Environment variable "LMI_RUN_TEDIOUS" can
+ allow them
"""
- cmd = ['rpm', '-qd', pkg.name]
- out = check_output(cmd)
- return file_path in set(out.splitlines()) #pylint: disable=E1103
+ if os.environ.get('LMI_RUN_TEDIOUS', '0') == '1':
+ return method
+ else:
+ return unittest.skip("This test is marked as tedious.")(method)
def get_pkg_files(pkg):
"""
@@ -124,9 +74,9 @@ def get_pkg_files(pkg):
elif len(symlinks) == 0 and os.path.islink(fpath):
symlinks.add(fpath)
elif not os.path.islink(fpath) and os.path.isfile(fpath):
- if len(configs) == 0 and is_config_file(pkg, fpath):
+ if len(configs) == 0 and rpmcache.has_pkg_config_file(pkg, fpath):
configs.add(fpath)
- elif len(docs) == 0 and is_doc_file(pkg, fpath):
+ elif len(docs) == 0 and rpmcache.has_pkg_doc_file(pkg, fpath):
docs.add(fpath)
elif len(files) == 0:
files.add(fpath)
@@ -142,6 +92,27 @@ class SoftwareBaseTestCase(unittest.TestCase): #pylint: disable=R0904
CLASS_NAME = "Define in subclass"
+ # will be filled when first needed
+ # it's a dictionary with items (pkg_name, [file_path1, ...])
+ PKGDB_FILES = None
+
+ @classmethod
+ def get_pkgdb_files(cls):
+ """
+ @return dictionary { pkg_name: ["file_path1, ...] }
+ """
+ if cls.PKGDB_FILES is not None:
+ return cls.PKGDB_FILES
+ SoftwareBaseTestCase.PKGDB_FILES = res = dict(
+ (pkg.name, get_pkg_files(pkg)) for pkg in itertools.chain(
+ cls.safe_pkgs, cls.dangerous_pkgs))
+ return res
+
+ @classmethod
+ def needs_pkgdb_files(cls):
+ """subclass may override this, if it needs PKGDB_FILES database"""
+ return False
+
def __init__(self, *args, **kwargs):
unittest.TestCase.__init__(self, *args, **kwargs)
self.longMessage = True
@@ -152,9 +123,20 @@ class SoftwareBaseTestCase(unittest.TestCase): #pylint: disable=R0904
namespace="root/cimv2", classname=self.CLASS_NAME)
def install_pkg(self, pkg, newer=True, repolist=None):
+ """
+ Use this method instead of function in rpmcache in tests.
+ """
+ if repolist is None:
+ repolist = self.test_repos
+ return rpmcache.install_pkg(pkg, newer, repolist)
+
+ def ensure_pkg_installed(self, pkg, newer=True, repolist=None):
+ """
+ Use this method instead of function in rpmcache in tests.
+ """
if repolist is None:
repolist = self.test_repos
- return install_pkg(pkg, newer, repolist)
+ return rpmcache.ensure_pkg_installed(pkg, newer, repolist)
def assertIsSubclass(self, cls, base_cls): #pylint: disable=C0103
"""
@@ -170,19 +152,17 @@ class SoftwareBaseTestCase(unittest.TestCase): #pylint: disable=R0904
"root/cimv2", base_cls, cls))
def assertEqual(self, fst, snd, *args, **kwargs):
+ """
+ Modify assertEqual for instance names comparing only important
+ properties.
+ """
if ( isinstance(fst, pywbem.CIMInstanceName)
and isinstance(snd, pywbem.CIMInstanceName)
- and fst.classname == "LMI_SoftwarePackage"
and fst.classname == snd.classname
and fst.namespace == snd.namespace
- and fst.keys() == snd.keys()
- and all(fst[k] == snd[k] for k in ("Name", "SoftwareElementID",
- "SoftwareElementState", "Version"))
- and isinstance(fst["TargetOperatingSystem"], (int, long))
- and isinstance(snd["TargetOperatingSystem"], (int, long))):
+ and fst.keybindings == snd.keybindings):
return True
- return unittest.TestCase.assertEqual(
- self, fst, snd, *args, **kwargs)
+ unittest.TestCase.assertEqual(self, fst, snd, *args, **kwargs)
@classmethod
def setUpClass(cls):
@@ -193,11 +173,8 @@ class SoftwareBaseTestCase(unittest.TestCase): #pylint: disable=R0904
cls.conn = pywbem.WBEMConnection(cls.url, (cls.user, cls.password))
cls.run_dangerous = (
os.environ.get('LMI_RUN_DANGEROUS', '0') == '1')
- cls.test_repos = os.environ.get('LMI_SOFTWARE_TEST_REPOS', '')
- if not cls.test_repos:
- cls.test_repos = []
- else:
- cls.test_repos = cls.test_repos.split(',')
+ cls.test_repos = os.environ.get(
+ 'LMI_SOFTWARE_TEST_REPOS', '').split(',')
use_cache = os.environ.get('LMI_SOFTWARE_USE_CACHE', '0') == '1'
cls.cache_dir = None
if use_cache:
@@ -210,19 +187,23 @@ class SoftwareBaseTestCase(unittest.TestCase): #pylint: disable=R0904
os.makedirs(cls.cache_dir)
# rpm packages are expected to be in CWD
os.chdir(cls.cache_dir)
- cls.pkgdb = rpmcache.get_pkg_database(use_cache=use_cache)
- for pkg in cls.pkgdb:
- if not is_installed(pkg.name):
- install_pkg(pkg, repolist=cls.test_repos)
- cls.pkg_files = dict((pkg.name, get_pkg_files(pkg))
- for pkg in cls.pkgdb)
+ cls.safe_pkgs, cls.dangerous_pkgs = rpmcache.get_pkg_database(
+ use_cache=use_cache,
+ dangerous=cls.run_dangerous,
+ repolist=cls.test_repos,
+ cache_dir=cls.cache_dir if use_cache else None)
+ for pkg in cls.dangerous_pkgs:
+ if not rpmcache.is_pkg_installed(pkg.name):
+ rpmcache.install_pkg(pkg, repolist=cls.test_repos)
+ if cls.needs_pkgdb_files():
+ cls.pkgdb_files = cls.get_pkgdb_files()
@classmethod
def tearDownClass(cls):
if cls.run_dangerous:
- for pkg in cls.pkgdb:
- if is_installed(pkg.name):
- remove_pkg(pkg.name)
+ for pkg in cls.dangerous_pkgs:
+ if rpmcache.is_pkg_installed(pkg.name):
+ rpmcache.remove_pkg(pkg.name)
if hasattr(cls, "prev_dir"):
os.chdir(cls.prev_dir)
diff --git a/src/software/test/package.py b/src/software/test/package.py
new file mode 100644
index 0000000..997ee34
--- /dev/null
+++ b/src/software/test/package.py
@@ -0,0 +1,117 @@
+#!/usr/bin/python
+# -*- Coding:utf-8 -*-
+#
+# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details. #
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Michal Minar <miminar@redhat.com>
+"""
+Abstraction for RPM package for test purposes.
+"""
+
+import util
+
+class Package(object): #pylint: disable=R0902
+ """
+ Element of test package database. It's a container for package
+ informations. It contains two sets of versions for single package.
+ That's meant for updating tests.
+ """
+ def __init__(self, name, epoch, ver, rel, arch, repo,
+ **kwargs):
+ """
+ Arguments prefixed with 'up_' are for newer package.
+ """
+ self._name = name
+ if not epoch or epoch.lower() == "(none)":
+ epoch = "0"
+ self._epoch = epoch
+ self._ver = ver
+ self._rel = rel
+ self._arch = arch
+ self._repo = repo
+ safe = kwargs.get('safe', False)
+ self._up_epoch = epoch if safe else kwargs.get('up_epoch', epoch)
+ self._up_ver = ver if safe else kwargs.get('up_ver' , ver)
+ self._up_rel = rel if safe else kwargs.get('up_rel' , rel)
+ self._up_repo = repo if safe else kwargs.get('up_repo', repo)
+
+ def __str__(self):
+ return self.get_nevra()
+
+ @property
+ def name(self): return self._name #pylint: disable=C0111,C0321
+ @property
+ def epoch(self): return self._epoch #pylint: disable=C0111,C0321
+ @property
+ def ver(self): return self._ver #pylint: disable=C0111,C0321
+ @property
+ def rel(self): return self._rel #pylint: disable=C0111,C0321
+ @property
+ def arch(self): return self._arch #pylint: disable=C0111,C0321
+ @property
+ def repo(self): return self._repo #pylint: disable=C0111,C0321
+ @property
+ def nevra(self): #pylint: disable=C0111,C0321
+ return self.get_nevra(False)
+ @property
+ def evra(self): #pylint: disable=C0111,C0321
+ return self.get_evra(False)
+
+ @property
+ def up_epoch(self): return self._up_epoch #pylint: disable=C0111,C0321
+ @property
+ def up_ver(self): return self._up_ver #pylint: disable=C0111,C0321
+ @property
+ def up_rel(self): return self._up_rel #pylint: disable=C0111,C0321
+ @property
+ def up_repo(self): return self._up_repo #pylint: disable=C0111,C0321
+ @property
+ def up_nevra(self): #pylint: disable=C0111,C0321
+ return self.get_nevra(True)
+ @property
+ def up_evra(self): #pylint: disable=C0111,C0321
+ return self.get_evra(True)
+
+ @property
+ def is_safe(self):
+ """
+ @return True if properties prefixed with up_ matches those without
+ it. In that case a package is suited for non-dangerous tests.
+ """
+ return all( getattr(self, a) == getattr(self, 'up_' + a)
+ for a in ('epoch', 'ver', 'rel', 'repo'))
+
+ def get_nevra(self, newer=True, with_epoch='NOT_ZERO'):
+ """
+ @newer if True, evr part is made from properties prefixed with 'up_'
+ @return pkg nevra string
+ """
+ if newer:
+ attrs = ['name', 'up_epoch', 'up_ver', 'up_rel', 'arch']
+ else:
+ attrs = ['name', 'epoch', 'ver', 'rel', 'arch']
+ return util.make_nevra(*[getattr(self, '_'+a) for a in attrs],
+ with_epoch=with_epoch)
+
+ def get_evra(self, newer=True):
+ """
+ @newer if True, evr part is made from properties prefixed with 'up_'
+ @return pkg nevra string
+ """
+ attrs = [('up_' if newer else '')+a for a in ('epoch', 'ver', 'rel')]
+ attrs.append('arch')
+ return util.make_evra(*[getattr(self, '_'+a) for a in attrs])
+
diff --git a/src/software/test/rpmcache.py b/src/software/test/rpmcache.py
index 9a0e98a..c2dde24 100644
--- a/src/software/test/rpmcache.py
+++ b/src/software/test/rpmcache.py
@@ -18,49 +18,59 @@
#
# Authors: Radek Novacek <rnovacek@redhat.com>
# Authors: Michal Minar <miminar@redhat.com>
+
"""
Creation and manipulation utilities with rpm cache for software tests.
"""
+
import copy
import datetime
import os
import pickle
+import random
import re
from collections import defaultdict
from subprocess import call, check_output, CalledProcessError
+from package import Package
+import util
+
DB_BACKUP_FILE = 'lmi_software_test_cache'
RE_AVAIL_PKG = re.compile(
r'^(?P<name>[^\s]+)\.(?P<arch>[a-zA-Z0-9_]+)'
- r'\s+(?P<epoch>([0-9]+:)?)(?P<version>[a-zA-Z0-9._+-]+)'
- r'-(?P<release>[a-zA-Z0-9_.]+)\s+'
- r'(?P<repository>[a-zA-Z0-9_-]+)\s*$', re.MULTILINE)
+ r'\s+((?P<epoch>[0-9]+):)?(?P<ver>[a-zA-Z0-9._+-]+)'
+ r'-(?P<rel>[a-zA-Z0-9_.]+)\s+'
+ r'(?P<repo>[a-zA-Z0-9_-]+)\s*$', re.MULTILINE)
# this won't match the last entry, unless "package\n" is not appended
# at the end of the string
RE_PKG_DEPS = re.compile(
r'^package:\s*(?P<name>[^\s]+)\.(?P<arch>[a-zA-Z0-9_]+)'
- r'\s+(?P<epoch>([0-9]+:)?)(?P<version>[a-zA-Z0-9._+-]+)'
- r'-(?P<release>[a-zA-Z0-9_.]+)\s+(?P<dep_list>.*?)'
+ r'\s+((?P<epoch>[0-9]+):)?(?P<ver>[a-zA-Z0-9._+-]+)'
+ r'-(?P<rel>[a-zA-Z0-9_.]+)\s+(?P<dep_list>.*?)'
r'(?=^package|\Z)', re.MULTILINE | re.DOTALL)
RE_DEPS_PROVIDERS = re.compile(
r'^\s*dependency:\s*(?P<dependency>.+?)\s*'
- r'(?P<providers>(^\s+provider:.+?$)+)', re.MULTILINE | re.IGNORECASE)
+ r'(?P<providers>(^\s+unsatisfied\s+dependency$)|(^\s+provider:.+?$)+)',
+ re.MULTILINE | re.IGNORECASE)
+RE_DEPS_UNSATISFIED = re.compile(r'^\s+unsatisfied\s+dep.*$', re.IGNORECASE)
RE_PROVIDER = re.compile(
r'^\s+provider:\s*(?P<name>[^\s]+)\.(?P<arch>[a-zA-Z0-9_]+)'
- r'\s+(?P<epoch>([0-9]+:)?)(?P<version>[a-zA-Z0-9._+-]+)'
- r'-(?P<release>[a-zA-Z0-9_.]+)\s*$', re.IGNORECASE | re.MULTILINE)
+ r'\s+((?P<epoch>[0-9]+):)?(?P<ver>[a-zA-Z0-9._+-]+)'
+ r'-(?P<rel>[a-zA-Z0-9_.]+)\s*$', re.IGNORECASE | re.MULTILINE)
RE_PKG_INFO = re.compile(
r'^Name\s*:\s*(?P<name>[^\s]+).*?'
r'^(Epoch\s*:\s*(?P<epoch>[0-9]+)\s+)?'
- r'^Version\s*:\s*(?P<version>[a-zA-Z0-9._+-]+)\s+'
- r'^Release\s*:\s*(?P<release>[^\s]+)\s+.*?'
+ r'^Version\s*:\s*(?P<ver>[a-zA-Z0-9._+-]+)\s+'
+ r'^Release\s*:\s*(?P<rel>[^\s]+)\s+.*?'
r'^Size\s*:\s*(?P<size>\d+(\.\d+)?)( *(?P<units>[kMG]))?',
re.MULTILINE | re.DOTALL | re.IGNORECASE)
RE_REPO = re.compile(
r'(?:^\*?)(?P<name>[^\s/]+\b)(?!\s+id)', re.MULTILINE | re.IGNORECASE)
-# maximum number of packages, that will be selected for testing
+# Maximum number of packages, that will be selected for testing / 2
+# There are 2 sets of test packages (safe and dangerous). When running
+# in dangerous mode, the resulting number will be twice as much.
MAX_PKG_DB_SIZE = 10
# step used to iterate over package names used to check for thery dependencies
# it's a number of packages, that will be passed to yum command at once
@@ -78,104 +88,6 @@ class MissingRPM(InvalidTestCache):
InvalidTestCache.__init__(self,
"Missing package '%s' in test cache!"%pkg_name)
-def make_nevra(name, epoch, ver, rel, arch, with_epoch='NOT_ZERO'):
- """
- @param with_epoch may be one of:
- "NOT_ZERO" - include epoch only if it's not zero
- "ALWAYS" - include epoch always
- "NEVER" - do not include epoch at all
- """
- estr = ''
- if with_epoch.lower() == "always":
- estr = epoch
- elif with_epoch.lower() == "not_zero":
- if epoch != "0":
- estr = epoch
- if len(estr):
- estr += ":"
- return "%s-%s%s-%s.%s" % (name, estr, ver, rel, arch)
-
-def run_yum(*params, **kwargs):
- """
- Runs yum with params and returns its output
- It's here especially to allow pass a repolist argument, that
- specifies list of repositories, to run the command on.
- """
- cmd = ['yum'] + list(params)
- repolist = kwargs.get('repolist', [])
- if repolist:
- cmd += ['--disablerepo=*']
- cmd += ['--enablerepo='+r for r in repolist]
- try:
- return check_output(cmd)
- except Exception:
- import pdb;pdb.set_trace()
-
-class Package(object): #pylint: disable=R0902
- """
- Element of test package database. It's a container for package
- informations. It contains two sets of versions for single package.
- That's meant for updating tests.
- """
- def __init__(self, name, epoch, ver, rel, arch, repo,
- up_epoch, up_ver, up_rel, up_repo):
- """
- Arguments prefixed with 'up_' are for newer package.
- """
- self._name = name
- self._epoch = epoch
- self._ver = ver
- self._rel = rel
- self._arch = arch
- self._repo = repo
- self._up_epoch = up_epoch
- self._up_ver = up_ver
- self._up_rel = up_rel
- self._up_repo = up_repo
-
- def __str__(self):
- return self.get_nevra()
-
- @property
- def name(self): return self._name #pylint: disable=C0111,C0321
- @property
- def epoch(self): return self._epoch #pylint: disable=C0111,C0321
- @property
- def ver(self): return self._ver #pylint: disable=C0111,C0321
- @property
- def rel(self): return self._rel #pylint: disable=C0111,C0321
- @property
- def arch(self): return self._arch #pylint: disable=C0111,C0321
- @property
- def repo(self): return self._repo #pylint: disable=C0111,C0321
- @property
- def nevra(self): #pylint: disable=C0111,C0321
- return self.get_nevra(True)
-
- @property
- def up_epoch(self): return self._up_epoch #pylint: disable=C0111,C0321
- @property
- def up_ver(self): return self._up_ver #pylint: disable=C0111,C0321
- @property
- def up_rel(self): return self._up_rel #pylint: disable=C0111,C0321
- @property
- def up_repo(self): return self._up_repo #pylint: disable=C0111,C0321
- @property
- def up_nevra(self): #pylint: disable=C0111,C0321
- return self.get_nevra(True)
-
- def get_nevra(self, newer=True, with_epoch='NOT_ZERO'):
- """
- @newer if True, evr part is made from properties prefixed with 'up_'
- @return pkg nevra string
- """
- if newer:
- attrs = ['name', 'up_epoch', 'up_ver', 'up_rel', 'arch']
- else:
- attrs = ['name', 'epoch', 'ver', 'rel', 'arch']
- return make_nevra(*[getattr(self, '_'+a) for a in attrs],
- with_epoch=with_epoch)
-
def _match_nevr(match):
"""
@param match is a regexp match object with parsed rpm package
@@ -183,22 +95,33 @@ def _match_nevr(match):
"""
epoch = match.group('epoch')
return ( match.group('name')
- , epoch[:-1] if epoch else "0"
- , match.group('version')
- , match.group('release'))
+ , epoch if epoch and epoch.lower() != "(none)" else "0"
+ , match.group('ver')
+ , match.group('rel'))
-def _match2pkg(match):
+def _match2pkg(match, safe=False):
"""
@param match is a regexp match object with attributes:
name, epoch, version, release, arch
+ @param safe if True, the packe will have up_* properties equal to
+ non-prefixed ones, otherwise they will be set to None
@return instance of Package
"""
+ kwargs = {}
+ if safe is True:
+ kwargs['safe'] = True
+ else:
+ for attr in ('epoch', 'ver', 'rel', 'repo'):
+ kwargs['up_'+attr] = None
+ epoch = match.group('epoch')
+ if not epoch or epoch.lower() == "(none)":
+ epoch = '0'
return Package(
match.group('name'),
- match.group('epoch')[:-1] if match.group('epoch') else '0',
- match.group('version'), match.group('release'),
- match.group('arch'), match.groupdict().get('repository', None),
- None, None, None, None)
+ epoch,
+ match.group('ver'), match.group('rel'),
+ match.group('arch'), match.groupdict().get('repo', None),
+ **kwargs)
def _filter_duplicates(installed, avail_str):
"""
@@ -213,7 +136,7 @@ def _filter_duplicates(installed, avail_str):
dups_list = []
cur_package_matches = []
prev_match = None
- system_arch = get_system_architecture()
+ system_arch = util.get_system_architecture()
for match in RE_AVAIL_PKG.finditer(avail_str):
if ( _match_nevr(match) in [ _match_nevr(m)
for m in cur_package_matches]
@@ -246,12 +169,14 @@ def _check_single_pkg_deps(
"""
for match_deps in RE_DEPS_PROVIDERS.finditer(dependencies_str):
providers = []
+ if RE_DEPS_UNSATISFIED.search(match_deps.group('providers')):
+ return False
for match_dep in RE_PROVIDER.finditer(match_deps.group('providers')):
if match_dep.group('name') not in installed:
continue
providers.append(_match2pkg(match_dep))
for provider in providers:
- if is_installed(provider, False):
+ if is_pkg_installed(provider, False):
break
else: # no provider is installed
return False
@@ -261,7 +186,7 @@ def _check_pkg_dependencies(
installed,
dup_list,
number_of_packages=MAX_PKG_DB_SIZE,
- repolist=[]):
+ repolist=None):
"""
Finds packages from dup_list with satisfied (installed) dependencies.
@param installed is a set of installed package names
@@ -274,7 +199,7 @@ def _check_pkg_dependencies(
yum_params = yum_params[:1]
for dups in dups_part:
yum_params.extend([d.get_nevra(newer=False) for d in dups])
- deplist_str = run_yum(*yum_params, repolist=repolist)
+ deplist_str = util.run_yum(*yum_params, repolist=repolist)
matches = RE_PKG_DEPS.finditer(deplist_str)
prev_match = None
for pkgs in dups_part:
@@ -300,14 +225,14 @@ def _check_pkg_dependencies(
break
return dups_no_deps
-def _sorted_db_by_size(pkgdb, repolist=[]):
+def _sorted_db_by_size(pkgdb, repolist=None):
"""
@param pkgdb is a list of lists of packages with common name
@return sorted instances of Package according to their size
"""
yum_params = ['info', '--showduplicates']
yum_params.extend([ps[0].name for ps in pkgdb])
- info_str = run_yum(*yum_params, repolist=repolist)
+ info_str = util.run_yum(*yum_params, repolist=repolist)
pkg_sizes = {}
# to get correct ordering from "yum info" command
# { pkg_name : [(epoch, version, release), ... ] }
@@ -329,7 +254,7 @@ def _sorted_db_by_size(pkgdb, repolist=[]):
if not epoch:
epoch = "0"
pkg_version_order[pkg_name].append((
- epoch, info_match.group('version'), info_match.group('release')))
+ epoch, info_match.group('ver'), info_match.group('rel')))
pkgdb = sorted(pkgdb, key=lambda pkgs: pkg_sizes[pkgs[0].name])[
:MAX_PKG_DB_SIZE]
@@ -350,7 +275,7 @@ def _get_repo_list():
repos_str = check_output(['yum', 'repolist', '-q'])
return RE_REPO.findall(repos_str)
-def _download_pkgdb(repolist, pkgdb, cache_dir=None):
+def _download_dangerous(repolist, pkgdb, cache_dir=None):
"""
Downloads all rpm packages (old and newer versions) from package database
to current directory.
@@ -385,32 +310,6 @@ def _make_rpm_path(pkg, cache_dir='', newer=True, without_epoch=False):
with_epoch='NEVER' if without_epoch else 'NOT_ZERO')
return os.path.join(cache_dir, nevra) + '.rpm'
-def is_installed(pkg, newer=True):
- """
- Check, whether package is installed.
- """
- if not isinstance(pkg, Package):
- return call(["rpm", "--quiet", "-q", pkg]) == 0
- else:
- try:
- cmd = [ "rpm", "-q", "--qf", "%{EPOCH}:%{NVRA}", pkg.get_nevra(
- newer, with_epoch='NEVER') ]
- out = check_output(cmd)
- epoch, nvra = out.split(':') #pylint: disable=E1103
- if not epoch or epoch == "(none)":
- epoch = "0"
- return ( epoch == pkg.epoch
- and nvra == pkg.get_nevra(newer=newer, with_epoch="NEVER"))
- except CalledProcessError:
- return False
-
-def rpm_exists(pkg, cache_dir='', newer=True):
- """
- @return True, when rpm package is in cache.
- """
- return ( os.path.exists(_make_rpm_path(pkg, cache_dir, newer))
- or os.path.exists(_make_rpm_path(pkg, cache_dir, newer, True)))
-
def get_rpm_name(pkg, cache_dir='', newer=True):
"""
Some packages do not have epoch in their name, even if it's higher than
@@ -425,56 +324,126 @@ def get_rpm_name(pkg, cache_dir='', newer=True):
return path
raise MissingRPM(pkg.name)
-def get_system_architecture():
+def rpm_exists(pkg, cache_dir='', newer=True):
+ """
+ @return True, when rpm package is in cache.
+ """
+ return ( os.path.exists(_make_rpm_path(pkg, cache_dir, newer))
+ or os.path.exists(_make_rpm_path(pkg, cache_dir, newer, True)))
+
+
+def remove_pkg(pkg, *args):
+ """
+ Remove package with rpm command.
+ @param pkg is either instance of Package or package name
+ @param args is a list of parameters for rpm command
+ """
+ if isinstance(pkg, Package):
+ pkg = pkg.name
+ call(["rpm", "--quiet"] + list(args) + ["-e", pkg])
+
+def install_pkg(pkg, newer=True, repolist=None):
"""
- @return the system architecture name as seen by rpm
+ Install a specific package.
+ @param pkg is either package name or instance of Package
+ In latter case, a specific version is installed.
+ @param repolist is a list of repositories, that should be
+ used for downloading, if using yum
+ when empty, all enabled repositories are used
"""
- return check_output(['rpm', '-q', '--qf', '%{ARCH}\n', 'rpm'])
+ if isinstance(pkg, Package):
+ try:
+ rpm_name = get_rpm_name(pkg, newer=newer)
+ call(["rpm", "--quiet", "-i", rpm_name])
+ return
+ except MissingRPM:
+ pass
+ pkg = pkg.name
+ util.run_yum('-q', '-y', 'install', pkg, repolist=repolist)
+
+def ensure_pkg_installed(pkg, newer=True, repolist=None):
+ """
+ Ensures, that specific version of package is installed. If other
+ version is installed, it is removed and reinstalled.
+ """
+ if not isinstance(pkg, Package):
+ raise TypeError("pkg must be a Package instance")
+ if not is_pkg_installed(pkg, newer):
+ if is_pkg_installed(pkg.name):
+ remove_pkg(pkg.name)
+ install_pkg(pkg, newer, repolist)
-def write_pkgdb(pkgdb, cache_dir=''):
+def verify_pkg(pkg):
+ """
+ @return output of command rpm, with verification output for package
+ """
+ if isinstance(pkg, basestring):
+ name = pkg
+ elif isinstance(pkg, Package):
+ name = pkg.name
+ else:
+ raise TypeError("pkg must be either package name or Package instance")
+ return call(["rpm", "--quiet", "-Va", name]) == 0
+
+def has_pkg_config_file(pkg, file_path):
+ """
+ @return True, if file_path is a configuration file of package pkg.
+ """
+ cmd = ['rpm', '-qc', pkg.name]
+ out = check_output(cmd)
+ return file_path in set(out.splitlines()) #pylint: disable=E1103
+
+def has_pkg_doc_file(pkg, file_path):
+ """
+ @return True, if file_path is a documentation file of package pkg.
+ """
+ cmd = ['rpm', '-qd', pkg.name]
+ out = check_output(cmd)
+ return file_path in set(out.splitlines()) #pylint: disable=E1103
+
+def is_pkg_installed(pkg, newer=True):
+ """
+ Check, whether package is installed.
+ """
+ if not isinstance(pkg, Package):
+ return call(["rpm", "--quiet", "-q", pkg]) == 0
+ else:
+ cmd = [ "rpm", "-q", "--qf", "%{EPOCH}:%{NVRA}\n", pkg.get_nevra(
+ newer, with_epoch='NEVER') ]
+ try:
+ out = check_output(cmd).splitlines()[0]
+ epoch, nvra = out.split(':') #pylint: disable=E1103
+ if not epoch or epoch.lower() == "(none)":
+ epoch = "0"
+ return ( epoch == getattr(pkg, 'up_epoch' if newer else 'epoch')
+ and nvra == pkg.get_nevra(newer=newer, with_epoch="NEVER"))
+ except CalledProcessError:
+ return False
+
+def write_pkgdb(safe_pkgs, dangerous_pkgs, cache_dir=''):
"""
Writes package database into a file named DB_BACKUP_FILE.
"""
with open(os.path.join(cache_dir, DB_BACKUP_FILE), 'w') as db_file:
- pickle.dump((datetime.datetime.now(), pkgdb), db_file)
+ data = (datetime.datetime.now(), safe_pkgs, dangerous_pkgs)
+ pickle.dump(data, db_file)
def load_pkgdb(cache_dir=''):
"""
This is inverse function to _write_pkgdb().
- @return package database loaded from file
+ @return (safe, dangerous) package lists loaded from file
"""
with open(os.path.join(cache_dir, DB_BACKUP_FILE), 'r') as db_file:
- date_time, pkgdb = pickle.load(db_file)
+ _, safe, dangerous = pickle.load(db_file)
#print "Loaded package database from: %s" % date_time
- return pkgdb
+ return safe, dangerous
-def get_pkg_database(force_update=False, use_cache=True,
- cache_dir='',
- repolist=[]):
+def make_dangerous_list(installed, repolist=None):
"""
- Checks yum database for available packages, that have at least two
- different versions in repositories. Only not installed ones with
- all of their dependencies intalled are selected.
- And from those, few of the smallest are downloaded as rpms.
- @return list of instances of Package of selected packages
+ This makes a list of instances of Package for dangerous tests.
"""
- if ( use_cache and not force_update
- and os.path.exists(os.path.join(cache_dir, DB_BACKUP_FILE))):
- pkgdb = load_pkgdb(cache_dir)
- valid_db = True
- for pkg in pkgdb:
- if ( not rpm_exists(pkg, cache_dir, False)
- or not rpm_exists(pkg, cache_dir, True)):
- valid_db = False
- #print "Old package database is not valid"
- break
- if valid_db:
- return pkgdb
- #print "Getting installed packages"
- installed = set(check_output( #pylint: disable=E1103
- ['rpm', '-qa', '--qf=%{NAME}\n']).splitlines())
#print "Getting all available packages"
- avail_str = run_yum('list', 'available', '--showduplicates',
+ avail_str = util.run_yum('list', 'available', '--showduplicates',
repolist=repolist)
# list of lists of packages with the same name, longer than 2
#print "Finding duplicates"
@@ -485,11 +454,88 @@ def get_pkg_database(force_update=False, use_cache=True,
number_of_packages=MAX_PKG_DB_SIZE*5,
repolist=repolist)
#print "Selecting the smallest ones"
- pkgdb = _sorted_db_by_size(selected, repolist=repolist)
+ return _sorted_db_by_size(selected, repolist=repolist)
+
+def make_safe_list(installed, exclude=None):
+ """
+ Makes list of installed packages for non-dangerous tests.
+ @param installed is a list of installed packages names
+ @param exclude is a list of package names, that won't appear in result
+ """
+ if exclude is None:
+ exclude = set()
+ base = list(installed)
+ random.shuffle(base)
+ res = []
+ i = 0
+ while len(res) < MAX_PKG_DB_SIZE and i < len(base):
+ name = base[i]
+ i += 1
+ if name in exclude:
+ continue
+ cmd = [ "rpm", "-q", "--qf", "%{EPOCH}:%{NVRA}\n", name ]
+ envra = check_output(cmd).splitlines()[0]
+ res.append(_match2pkg(util.RE_ENVRA.match(envra), safe=True))
+ return res
+
+def get_pkg_database(
+ force_update=False,
+ use_cache=True,
+ dangerous=False,
+ cache_dir='',
+ repolist=None):
+ """
+ Checks yum database for available packages, that have at least two
+ different versions in repositories. Only not installed ones with
+ all of their dependencies intalled are selected.
+ And from those, few of the smallest are downloaded as rpms.
+ @param dangerous whether to load available, not installed
+ packages for dangerous tests
+ @return (safe, dangerous)
+ where
+ safe is a list of instances of Package, representing installed
+ software, these should be used for not-dangerous tests;
+ both older
+ dangerous is a list of instances of Package of selected packages,
+ that are not installed, but available; instances contain
+ both newer and older version of package
+ """
+ dangerous_pkgs = []
+ if ( use_cache and not force_update
+ and os.path.exists(os.path.join(cache_dir, DB_BACKUP_FILE))):
+ safe_pkgs, dangerous_pkgs = load_pkgdb(cache_dir)
+ valid_db = True
+ if len(safe_pkgs) < MAX_PKG_DB_SIZE:
+ valid_db = False
+ elif not dangerous and len(dangerous_pkgs) > 0:
+ dangerous_pkgs = []
+ elif dangerous and len(dangerous_pkgs) == 0:
+ valid_db = False
+ else:
+ for pkg in safe_pkgs:
+ if not is_pkg_installed(pkg):
+ valid_db = False
+ break
+ for pkg in dangerous_pkgs:
+ if ( not rpm_exists(pkg, cache_dir, False)
+ or not rpm_exists(pkg, cache_dir, True)):
+ valid_db = False
+ #print "Old package database is not valid"
+ break
+ if valid_db:
+ return (safe_pkgs, dangerous_pkgs)
+ #print "Getting installed packages"
+ installed = set(check_output( #pylint: disable=E1103
+ ['rpm', '-qa', '--qf=%{NAME}\n']).splitlines())
+ if dangerous:
+ dangerous_pkgs = make_dangerous_list(installed)
+ safe_pkgs = make_safe_list(installed, exclude=set(
+ pkg.name for pkg in dangerous_pkgs))
+
if use_cache:
- repolist = _get_repo_list()
- _download_pkgdb(repolist, pkgdb, cache_dir)
+ repolist = _get_repo_list() if repolist in (None, []) else repolist
+ _download_dangerous(repolist, dangerous_pkgs, cache_dir)
#print "Backing up database information"
- write_pkgdb(pkgdb, cache_dir)
- return pkgdb
+ write_pkgdb(safe_pkgs, dangerous_pkgs, cache_dir)
+ return (safe_pkgs, dangerous_pkgs)
diff --git a/src/software/test/run.py b/src/software/test/run.py
index 950b0c9..00ef304 100755
--- a/src/software/test/run.py
+++ b/src/software/test/run.py
@@ -68,6 +68,7 @@ def parse_cmd_line():
parser.add_argument("-p", "--password",
default=os.environ.get("LMI_CIMOM_PASSWORD", ''),
help="User's password.")
+
dangerous_group = parser.add_mutually_exclusive_group()
dangerous_group.add_argument("--run-dangerous", action="store_true",
default=(os.environ.get('LMI_RUN_DANGEROUS', '0') == '1'),
@@ -78,6 +79,17 @@ def parse_cmd_line():
dest="run_dangerous",
default=os.environ.get('LMI_RUN_DANGEROUS', '0') == '1',
help="Disable dangerous tests.")
+
+ tedious_group = parser.add_mutually_exclusive_group()
+ tedious_group.add_argument("--run-tedious", action="store_true",
+ default=(os.environ.get('LMI_RUN_TEDIOUS', '0') == '1'),
+ help="Run also tedious (long running) for this machine."
+ " Overrides environment variable LMI_RUN_TEDIOUS.")
+ tedious_group.add_argument('--no-tedious', action="store_false",
+ dest="run_tedious",
+ default=os.environ.get('LMI_RUN_TEDIOUS', '0') == '1',
+ help="Disable tedious tests.")
+
cache_group = parser.add_mutually_exclusive_group()
cache_group.add_argument("-c", "--use-cache", action="store_true",
default=(os.environ.get('LMI_SOFTWARE_USE_CACHE', '0') == '1'),
@@ -160,6 +172,8 @@ def prepare_environment(args):
os.environ['LMI_CIMOM_PASSWORD'] = args.password
os.environ['LMI_RUN_DANGEROUS'] = (
'1' if args.run_dangerous else '0')
+ os.environ["LMI_RUN_TEDIOUS"] = (
+ '1' if args.run_tedious else '0')
os.environ['LMI_SOFTWARE_USE_CACHE'] = '1' if args.use_cache else '0'
if args.use_cache:
os.environ['LMI_SOFTWARE_CACHE_DIR'] = CACHE_DIR
@@ -270,9 +284,12 @@ def main():
repolist = args.test_repos.split(',')
else:
repolist = []
- rpmcache.get_pkg_database(args.force_update, args.use_cache,
- CACHE_DIR, repolist=repolist)
- #rpmcache.make_pkg_database(packages
+ rpmcache.get_pkg_database(
+ args.force_update,
+ args.use_cache,
+ dangerous=args.run_dangerous,
+ cache_dir=CACHE_DIR,
+ repolist=repolist)
prepare_environment(args)
test_program = unittest.main(argv=ut_args,
testLoader=LMITestLoader(), exit=False)
diff --git a/src/software/test/test_hosted_software_collection.py b/src/software/test/test_hosted_software_collection.py
new file mode 100755
index 0000000..dc798d8
--- /dev/null
+++ b/src/software/test/test_hosted_software_collection.py
@@ -0,0 +1,155 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Michal Minar <miminar@redhat.com>
+#
+"""
+Unit tests for LMI_MemberOfSoftwareCollection provider.
+"""
+
+import pywbem
+import socket
+import unittest
+
+import base
+
+class TestHostedSoftwareCollection(base.SoftwareBaseTestCase):
+ """
+ Basic cim operations test.
+ """
+
+ CLASS_NAME = "LMI_HostedSoftwareCollection"
+ KEYS = ("Antecedent", "Dependent")
+
+ def make_op(self):
+ """
+ @param ses SoftwareElementState property value
+ @return object path of SoftwareIdentity
+ """
+ objpath = self.objpath.copy()
+ objpath["Antecedent"] = pywbem.CIMInstanceName(
+ classname="Linux_ComputerSystem",
+ namespace="root/cimv2",
+ keybindings=pywbem.NocaseDict({
+ "CreationClassName" : "Linux_ComputerSystem",
+ "Name" : socket.gethostname()
+ }))
+ objpath["Dependent"] = pywbem.CIMInstanceName(
+ classname="LMI_SystemSoftwareCollection",
+ namespace="root/cimv2",
+ keybindings=pywbem.NocaseDict({
+ "InstanceID" : "LMI:SystemSoftwareCollection"
+ }))
+ return objpath
+
+ def test_get_instance(self):
+ """
+ Tests GetInstance call on packages from our rpm cache.
+ """
+ objpath = self.make_op()
+ inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False)
+ self.assertIsInstance(inst, pywbem.CIMInstance)
+ self.assertEqual(inst.path, objpath)
+ self.assertEqual(sorted(inst.properties.keys()), sorted(self.KEYS))
+ self.assertEqual(objpath, inst.path)
+ for key in self.KEYS:
+ self.assertEqual(inst[key], inst.path[key])
+
+ # try with CIM_ prefix
+ antecedent = objpath["Antecedent"].copy()
+ objpath["Antecedent"].classname = "CIM_ComputerSystem"
+
+ inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False)
+ self.assertIsInstance(inst, pywbem.CIMInstance)
+ objpath["Antecedent"] = antecedent.copy()
+ self.assertEqual(objpath, inst.path)
+ self.assertEqual(sorted(inst.properties.keys()), sorted(self.KEYS))
+ for key in self.KEYS:
+ self.assertEqual(inst[key], inst.path[key])
+
+ # try with CIM_ prefix also for CreationClassName
+ objpath["Antecedent"]["CreationClassName"] = "CIM_ComputerSystem"
+
+ inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False)
+ self.assertIsInstance(inst, pywbem.CIMInstance)
+ objpath["Antecedent"] = antecedent.copy()
+ self.assertEqual(objpath, inst.path)
+ self.assertEqual(sorted(inst.properties.keys()), sorted(self.KEYS))
+ for key in self.KEYS:
+ self.assertEqual(inst[key], inst.path[key])
+
+ def test_enum_instances(self):
+ """
+ Tests EnumInstances call on installed packages.
+ """
+ objpath = self.make_op()
+ insts = self.conn.EnumerateInstances(ClassName=self.CLASS_NAME)
+ self.assertEqual(1, len(insts))
+ self.assertEqual(objpath, insts[0].path)
+ self.assertEqual(sorted(insts[0].properties.keys()), sorted(self.KEYS))
+ self.assertEqual(objpath, insts[0].path)
+ for key in self.KEYS:
+ self.assertEqual(insts[0][key], insts[0].path[key])
+
+ def test_enum_instance_names(self):
+ """
+ Tests EnumInstanceNames call on installed packages.
+ """
+ objpath = self.make_op()
+ inames = self.conn.EnumerateInstanceNames(ClassName=self.CLASS_NAME)
+ self.assertEqual(1, len(inames))
+ self.assertEqual(objpath, inames[0])
+
+ def test_get_antecedent_referents(self):
+ """
+ Test ReferenceNames for ComputerSystem.
+ """
+ objpath = self.make_op()
+ refs = self.conn.ReferenceNames(
+ Role="Antecedent",
+ ObjectName=objpath["Antecedent"])
+ self.assertGreater(len(refs), 0)
+ refs = [ r for r in refs if "Dependent" in r
+ and r["Dependent"].classname == \
+ "LMI_SystemSoftwareCollection" ]
+ self.assertEqual(1, len(refs))
+ ref = refs[0]
+ self.assertEqual(ref, objpath)
+
+ @base.mark_dangerous
+ def test_get_dependent_referents(self):
+ """
+ Test ReferenceNames for SystemSoftwareCollection.
+ """
+ objpath = self.make_op()
+ refs = self.conn.AssociatorNames(
+ ObjectName=objpath["Dependent"],
+ Role="Dependent",
+ ResultRole="Antecedent",
+ ResultClass="Linux_ComputerSystem")
+ self.assertEqual(1, len(refs))
+ ref = refs[0]
+ self.assertEqual(objpath["Antecedent"], ref)
+
+def suite():
+ """For unittest loaders."""
+ return unittest.TestLoader().loadTestsFromTestCase(
+ TestHostedSoftwareCollection)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/src/software/test/test_member_of_software_collection.py b/src/software/test/test_member_of_software_collection.py
new file mode 100755
index 0000000..4d375c4
--- /dev/null
+++ b/src/software/test/test_member_of_software_collection.py
@@ -0,0 +1,155 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Michal Minar <miminar@redhat.com>
+#
+"""
+Unit tests for LMI_MemberOfSoftwareCollection provider.
+"""
+
+import pywbem
+import unittest
+
+import base
+
+class TestMemberOfSoftwareCollection(base.SoftwareBaseTestCase):
+ """
+ Basic cim operations test.
+ """
+
+ CLASS_NAME = "LMI_MemberOfSoftwareCollection"
+ KEYS = ("Collection", "Member")
+
+ def make_op(self, pkg, newer=True):
+ """
+ @return object path of MembeOfSoftwareCollection association
+ """
+ objpath = self.objpath.copy()
+ objpath["Collection"] = pywbem.CIMInstanceName(
+ classname="LMI_SystemSoftwareCollection",
+ namespace="root/cimv2",
+ keybindings=pywbem.NocaseDict({
+ "InstanceID" : "LMI:SystemSoftwareCollection"
+ }))
+ objpath["Member"] = pywbem.CIMInstanceName(
+ classname="LMI_SoftwareIdentity",
+ namespace="root/cimv2",
+ keybindings=pywbem.NocaseDict({
+ "InstanceID" : 'LMI:PKG:' + pkg.get_nevra(newer=newer,
+ with_epoch="ALWAYS")
+ }))
+ return objpath
+
+ def test_get_instance(self):
+ """
+ Tests GetInstance call on packages from our rpm cache.
+ """
+ for pkg in self.safe_pkgs:
+ objpath = self.make_op(pkg)
+ inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False)
+ self.assertEqual(inst.path, objpath,
+ "Object paths should match for package %s"%pkg)
+ for key in self.KEYS:
+ self.assertTrue(inst.properties.has_key(key),
+ "OP is missing \"%s\" key for package %s"%(key, pkg))
+ self.assertEqual(objpath, inst.path)
+
+ @base.mark_tedious
+ def test_enum_instance_names(self):
+ """
+ Tests EnumInstanceNames call on installed packages.
+ """
+ inames = self.conn.EnumerateInstanceNames(ClassName=self.CLASS_NAME)
+ self.assertGreater(len(inames), 0)
+ objpath = self.make_op(self.safe_pkgs[0])
+ for iname in inames:
+ self.assertIsInstance(iname, pywbem.CIMInstanceName)
+ self.assertEqual(iname.namespace, 'root/cimv2')
+ self.assertEqual(iname.classname, self.CLASS_NAME)
+ self.assertEqual(sorted(iname.keys()), sorted(self.KEYS))
+ self.assertEqual(objpath["Collection"], iname["Collection"])
+ nevra_set = set(i["Member"]["InstanceID"] for i in inames)
+ for pkg in self.safe_pkgs:
+ nevra = 'LMI:PKG:'+pkg.get_nevra(with_epoch="ALWAYS")
+ self.assertTrue(nevra in nevra_set,
+ 'Missing nevra "%s".' % nevra)
+
+# @base.mark_tedious
+# def test_enum_instances(self):
+# """
+# Tests EnumInstances call on installed packages.
+# """
+# insts = self.conn.EnumerateInstances(ClassName=self.CLASS_NAME)
+# self.assertGreater(len(insts), 0)
+# for inst in insts:
+# self.assertIsInstance(inst, pywbem.CIMInstance)
+# self.assertEqual(inst.namespace, 'root/cimv2')
+# self.assertEqual(inst.classname, self.CLASS_NAME)
+# self.assertEqual(sorted(inst.keys()), sorted(self.KEYS))
+# self.assertEqual(inst["InstanceID"], inst.path["InstanceID"])
+# nevra_set = set(i["Member"]["InstanceID"] for i in insts)
+# for pkg in self.safe_pkgs:
+# self.assertIn(pkg.get_nevra(with_epoch="ALWAYS"), nevra_set)
+
+ @base.mark_tedious
+ def test_get_collection_referents(self):
+ """
+ Test ReferenceNames for SystemSoftwareCollection.
+ """
+ objpath = self.make_op(self.safe_pkgs[0])
+ refs = self.conn.AssociatorNames(
+ Role="Collection",
+ ObjectName=objpath["Collection"],
+ ResultRole="Member",
+ ResultClass="LMI_SoftwareIdentity")
+ self.assertGreater(len(refs), 0)
+ for ref in refs:
+ self.assertIsInstance(ref, pywbem.CIMInstanceName)
+ self.assertEqual(ref.namespace, 'root/cimv2')
+ self.assertEqual(ref.classname, "LMI_SoftwareIdentity")
+ self.assertEqual(sorted(ref.keys()), ["InstanceID"])
+ self.assertTrue(ref["InstanceID"].startswith("LMI:PKG:"))
+ nevra_set = set(i["InstanceID"] for i in refs)
+ # NOTE: installed packages might not be available
+ for pkg in self.safe_pkgs:
+ nevra = 'LMI:PKG:'+pkg.get_nevra(with_epoch="ALWAYS")
+ self.assertTrue(nevra in nevra_set,
+ 'Missing nevra "%s".' % nevra)
+
+ def test_get_member_referents(self):
+ """
+ Test ReferenceNames for SoftwareIdentity.
+ """
+ for pkg in self.safe_pkgs:
+ objpath = self.make_op(pkg)
+ refs = self.conn.AssociatorNames(
+ ObjectName=objpath["Member"],
+ Role="Member",
+ ResultRole="Collection",
+ ResultClass="LMI_SystemSoftwareCollection")
+ self.assertEqual(len(refs), 1)
+ ref = refs[0]
+ self.assertEqual(objpath["Collection"], ref)
+
+def suite():
+ """For unittest loaders."""
+ return unittest.TestLoader().loadTestsFromTestCase(
+ TestMemberOfSoftwareCollection)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/src/software/test/test_software_file_check.py b/src/software/test/test_software_file_check.py
deleted file mode 100755
index 7b97ee9..0000000
--- a/src/software/test/test_software_file_check.py
+++ /dev/null
@@ -1,527 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-#
-# Authors: Michal Minar <miminar@redhat.com>
-#
-"""
-Unit tests for LMI_SoftwareFileCheck provider.
-"""
-
-import hashlib
-import os
-import pywbem
-import re
-import subprocess
-import unittest
-from collections import namedtuple
-
-import common
-
-RE_CHECKSUM = re.compile(r'^([0-9a-fA-F]+)\s+.*')
-
-PassedFlags = namedtuple("PassedFlags", #pylint: disable=C0103
- "exists, type, size, mode, checksum, dev, ltarget, uid, gid, mtime")
-
-class TestSoftwareFileCheck(common.SoftwareBaseTestCase):
- #pylint: disable=R0904
- """
- Basic cim operations test.
- """
-
- CLASS_NAME = "LMI_SoftwareFileCheck"
- KEYS = ( "CheckID", "Name", "SoftwareElementID", "SoftwareElementState"
- , "TargetOperatingSystem", "Version")
-
- hash_num2algo = {
- 1 : hashlib.md5, #pylint: disable=E1101
- 2 : hashlib.sha1, #pylint: disable=E1101
- 8 : hashlib.sha256, #pylint: disable=E1101
- 9 : hashlib.sha384, #pylint: disable=E1101
- 10 : hashlib.sha512, #pylint: disable=E1101
- 11 : hashlib.sha224 #pylint: disable=E1101
- }
-
- hash_num2cmd = {
- 1 : "md5sum",
- 2 : "sha1sum",
- 8 : "sha256sum",
- 9 : "sha384sum",
- 10 : "sha512sum",
- 11 : "sha224sum"
- }
-
- hash_num2length = {
- 1 : 32,
- 2 : 40,
- 8 : 64,
- 9 : 96,
- 10 : 128,
- 11 : 56
- }
-
- def make_op(self, pkg, filename, newer=True):
- """
- @return object path of LMI_SoftwareFileCheck
- """
- objpath = self.objpath.copy()
- objpath["Name"] = filename
- objpath["Version"] = getattr(pkg, "up_ver" if newer else "ver")
- objpath["CheckID"] = "%s#%s" % (pkg.name, filename)
- objpath["SoftwareElementState"] = pywbem.Uint16(2)
- objpath["TargetOperatingSystem"] = pywbem.Uint16(36)
- objpath["SoftwareElementID"] = pkg.get_nevra(newer)
- return objpath
-
- def assertEqualSEID(self, id1, id2, msg=None):
- """
- This is for comparison of SoftwareElementID property values.
- """
- if ( not isinstance(id1, basestring)
- or not isinstance(id2, basestring)):
- return common.SoftwareBaseTestCase.assertEqual(self, id1, id2, msg)
- match1 = common.RE_NEVRA.match(id1)
- match2 = common.RE_NEVRA.match(id2)
- if not match1 or not match2:
- return common.SoftwareBaseTestCase.assertEqual(self, id1, id2, msg)
- if any( match1.group(g) != match2.group(g)
- for g in ('name', 'ver', 'rel', 'arch')):
- return common.SoftwareBaseTestCase.assertEqual(self, id1, id2, msg)
- epoch1 = match1.group('epoch')
- epoch2 = match2.group('epoch')
- if not epoch1:
- epoch1 = '0'
- if not epoch2:
- epoch2 = '0'
- if epoch1 != epoch2:
- return common.SoftwareBaseTestCase.assertEqual(self, id1, id2, msg)
- return True
-
- def assertEqual(self, op1, op2, msg=None):
- """
- This is override for object paths, that allows some differences
- (like missing epoch in SoftwareElementID).
- """
- if ( not isinstance(op1, pywbem.CIMInstanceName)
- or not isinstance(op2, pywbem.CIMInstanceName)
- or op1.classname != op2.classname
- or op1.namespace != op2.namespace
- or op1.keybindings.keys() != op2.keybindings.keys()
- or any(op1[a] != op2[a] for a in
- ( 'Name', 'Version', 'CheckID', 'SoftwareElementState'
- , 'TargetOperatingSystem'))):
- return common.SoftwareBaseTestCase.assertEqual(self, op1, op2, msg)
- return self.assertEqualSEID(
- op1['SoftwareElementID'], op2['SoftwareElementID'], msg)
-
- def make_checksum_str(self, csumnum, filename):
- """
- @return checksum of installed file
- """
- return RE_CHECKSUM.match(subprocess.check_output([
- self.hash_num2cmd[csumnum], filename])).group(1).lower()
-
- def do_check_symlink(self, pkg, filepath, inst):
- """
- Assert some details about symlink.
- """
- target = os.readlink(filepath)
- stats = os.lstat(filepath)
-
- self.assertEqual(pywbem.Uint16(3), inst["FileType"],
- "Unexpected file type of symlink for %s:%s"%(
- pkg.name, filepath))
- self.assertEqual(stats.st_uid, inst["FileUserID"],
- "Unexpected uid of symlink for %s:%s"%(pkg.name, filepath))
- self.assertEqual(stats.st_gid, inst["FileGroupID"],
- "Unexpected gid of symlink for %s:%s"%(pkg.name, filepath))
- self.assertEqual(stats.st_mode, inst["FileMode"],
- "Unexpected mode of symlink for %s:%s" %(pkg.name, filepath))
- self.assertEqual(stats.st_size, inst["FileSize"],
- "Unexpected size of symlink for %s:%s"%(pkg.name, filepath))
- self.assertEqual(target, inst["LinkTarget"],
- "Unexpected size of symlink for %s:%s"%(pkg.name, filepath))
- self.assertEqual("0"*self.hash_num2length[inst["FileChecksumType"]],
- inst["FileChecksum"],
- "Unexpected hash of symlink for %s:%s"%(pkg.name, filepath))
- self.assertEqual(int(stats.st_mtime), inst["LastModificationTime"],
- "Unexpected mtime of symlink for %s:%s"%(pkg.name, filepath))
-
- # modify owner
- prev_user = inst["FileUserID"]
- #prev_mtime = inst["LastModificationTime"]
- #prev_pflags = PassedFlags(*inst["PassedFlags"])
- os.lchown(filepath, stats.st_uid + 1, -1)
- inst = self.conn.GetInstance(InstanceName=inst.path)
- self.assertEqual(inst["ExpectedFileUserID"] + 1, inst["FileUserID"],
- "Unexpected uid of modified symlink for %s:%s"%(
- pkg.name, filepath))
- self.assertEqual(prev_user + 1, inst["FileUserID"],
- "Unexpected uid of modified symlink for %s:%s"%(
- pkg.name, filepath))
- self.assertEqual(stats.st_gid, inst["FileGroupID"],
- "Unexpected gid of modified symlink for %s:%s"%(
- pkg.name, filepath))
- cur_pflags = PassedFlags(*inst["PassedFlags"])
- #self.assertGreater(inst["LastModificationTime"], prev_mtime)
-
- self.assertTrue(cur_pflags.exists,
- "Symlink %s:%s should exist." %(pkg.name, filepath))
- self.assertTrue(cur_pflags.type,
- "File type should match for symlink %s:%s"%(pkg.name, filepath))
- self.assertTrue(cur_pflags.size,
- "File size should match for symlink %s:%s"%(pkg.name, filepath))
- self.assertTrue(cur_pflags.mode,
- "File mode should match for symlink %s:%s"%(pkg.name, filepath))
- self.assertTrue(cur_pflags.checksum,
- "File checksum should match for symlink %s:%s"%(
- pkg.name, filepath))
- self.assertTrue(cur_pflags.dev,
- "Device number should match for symlink %s:%s"%(
- pkg.name, filepath))
- self.assertTrue(cur_pflags.ltarget,
- "Link target should match for symlink %s:%s"%(pkg.name, filepath))
- self.assertFalse(cur_pflags.uid,
- "Uid should not match for symlink %s:%s"%(pkg.name, filepath))
- self.assertTrue(cur_pflags.gid,
- "Gid shoud match for symlink %s:%s"%(pkg.name, filepath))
- self.assertTrue(cur_pflags.mtime,
- "Mtime should match for symlink %s:%s"%(pkg.name, filepath))
-
- # modify link_target
- os.remove(filepath)
- os.symlink("wrong" + "*"*len(inst["ExpectedLinkTarget"]), filepath)
- os.lchown(filepath, inst["ExpectedFileUserID"],
- inst["ExpectedFileGroupID"])
-
- inst = self.conn.GetInstance(InstanceName=inst.path)
- cur_pflags = PassedFlags(*inst["PassedFlags"])
- self.assertGreater(len(inst["LinkTarget"]),
- len(inst["ExpectedLinkTarget"]))
-
- self.assertTrue(cur_pflags.exists,
- "File %s:%s should exist"%(pkg.name, filepath))
- self.assertTrue(cur_pflags.type,
- "File type should match for symlink %s:%s"%(pkg.name, filepath))
- # file size not checked for symlinks
- #self.assertFalse(cur_pflags.size,
- #"File size should not match for symlink %s:%s"%(
- #pkg.name, filepath))
- self.assertTrue(cur_pflags.mode,
- "File mode should match for symlink %s:%s"%(pkg.name, filepath))
- self.assertTrue(cur_pflags.checksum,
- "Checksum should match for symlink %s:%s"%(
- pkg.name, filepath))
- self.assertTrue(cur_pflags.dev,
- "Device numbers should match for symlink %s:%s"%(
- pkg.name, filepath))
- self.assertFalse(cur_pflags.ltarget,
- "Link target should not match for symlink %s:%s"%(
- pkg.name, filepath))
- self.assertTrue(cur_pflags.uid,
- "File uid should match for symlink %s:%s"%(pkg.name, filepath))
- self.assertTrue(cur_pflags.gid,
- "File gid should match for symlink %s:%s"%(pkg.name, filepath))
- self.assertTrue(cur_pflags.mtime,
- "File mtime should match for symlink %s:%s"%(pkg.name, filepath))
-
- def do_check_directory(self, pkg, filepath, inst):
- """
- Assert some details about directory.
- """
- stats = os.lstat(filepath)
-
- self.assertEqual(pywbem.Uint16(2), inst["FileType"],
- "Unexpected type for directory %s:%s"%(pkg.name, filepath))
- self.assertEqual(stats.st_uid, inst["FileUserID"],
- "Unexpected uid for directory %s:%s"%(pkg.name, filepath))
- self.assertEqual(stats.st_gid, inst["FileGroupID"],
- "Unexpected gid for directory %s:%s"%(pkg.name, filepath))
- self.assertEqual(stats.st_mode, inst["FileMode"],
- "Unexpected mode for directory %s:%s"%(pkg.name, filepath))
- self.assertEqual(stats.st_size, inst["FileSize"],
- "Unexpected size for directory %s:%s"%(pkg.name, filepath))
- self.assertIs(inst["LinkTarget"], None)
- self.assertEqual("0"*self.hash_num2length[inst["FileChecksumType"]],
- inst["FileChecksum"],
- "Unexpected checksum for directory %s:%s"%(pkg.name, filepath))
- self.assertEqual(int(stats.st_mtime), inst["LastModificationTime"],
- "Unexpected mtime for directory %s:%s"%(pkg.name, filepath))
-
- def do_check_file(self, pkg, filepath, inst):
- """
- Assert some details about regurar file.
- """
- stats = os.lstat(filepath)
-
- self.assertEqual(pywbem.Uint16(1), inst["FileType"],
- "Unexpected file type for %s:%s"%(pkg.name, filepath))
- self.assertEqual(stats.st_uid, inst["FileUserID"],
- "Unexpected file uid for %s:%s"%(pkg.name, filepath))
- self.assertEqual(stats.st_gid, inst["FileGroupID"],
- "Unexpected gid for regular file %s:%s"%(pkg.name, filepath))
- self.assertEqual(stats.st_mode, inst["FileMode"],
- "Unexpected mode for reqular file %s:%s"%(pkg.name, filepath))
- self.assertEqual(stats.st_size, inst["FileSize"],
- "Unexpected size for reqular file %s:%s"%(pkg.name, filepath))
- self.assertIs(inst["LinkTarget"], None)
- csum = self.make_checksum_str(inst['FileChecksumType'], filepath)
- self.assertEqual(csum, inst["FileChecksum"].lower(),
- "Unexpected checksum for reqular file %s:%s"%(pkg.name, filepath))
- self.assertEqual(inst["LastModificationTime"],
- inst["ExpectedLastModificationTime"],
- "Unexpected mtime for reqular file %s:%s"%(pkg.name, filepath))
- self.assertEqual(int(stats.st_mtime), inst["LastModificationTime"],
- "Unexpected mtime for reqular file %s:%s"%(pkg.name, filepath))
-
- # make it longer
- with open(filepath, "a+") as fobj:
- fobj.write("data\n")
- inst = self.conn.GetInstance(InstanceName=inst.path)
- cur_pflags = PassedFlags(*inst["PassedFlags"])
- self.assertGreater(inst["FileSize"], inst["ExpectedFileSize"],
- "File size should be greater, then expected for reqular file"
- " %s:%s"%(pkg.name, filepath))
- self.assertGreater(inst["LastModificationTime"],
- inst["ExpectedLastModificationTime"],
- "Unexpected mtime for reqular file %s:%s"%(pkg.name, filepath))
-
- self.assertTrue(cur_pflags.exists,
- "Regular file should exist %s:%s"%(pkg.name, filepath))
- self.assertTrue(cur_pflags.type,
- "Type of regular file should match for %s:%s"%(pkg.name, filepath))
- self.assertFalse(cur_pflags.size,
- "Size should not match for regular file %s:%s"%(
- pkg.name, filepath))
- self.assertTrue(cur_pflags.mode,
- "Mode should match for regular file %s:%s"%(pkg.name, filepath))
- self.assertFalse(cur_pflags.checksum,
- "Checksum should not match for regular file %s:%s"%(
- pkg.name, filepath))
- self.assertTrue(cur_pflags.dev,
- "Device number should match for regular file %s:%s"%(
- pkg.name, filepath))
- self.assertTrue(cur_pflags.ltarget,
- "Link target should match for regular file %s:%s"%(
- pkg.name, filepath))
- self.assertTrue(cur_pflags.uid,
- "File uid should match for %s:%s"%(pkg.name, filepath))
- self.assertTrue(cur_pflags.gid,
- "File gid should match for %s:%s"%(pkg.name, filepath))
- self.assertFalse(cur_pflags.mtime,
- "File mtime should not match for %s:%s"%(pkg.name, filepath))
-
- # change file type
- os.remove(filepath)
- os.symlink(filepath, filepath)
- os.lchown(filepath, inst["ExpectedFileUserID"],
- inst["ExpectedFileGroupID"])
- inst = self.conn.GetInstance(InstanceName=inst.path)
- cur_pflags = PassedFlags(*inst["PassedFlags"])
- self.assertNotEqual(inst["ExpectedLinkTarget"], inst["LinkTarget"],
- "Link target should not match for %s:%s"%(pkg.name, filepath))
- self.assertNotEqual(inst["ExpectedFileSize"], inst["FileSize"],
- "File size should not match for %s:%s"%(pkg.name, filepath))
- self.assertGreater(inst["LastModificationTime"],
- inst["ExpectedLastModificationTime"],
- "File mtime should be greater than expected for %s:%s"%(
- pkg.name, filepath))
- self.assertNotEqual(inst["ExpectedFileType"], inst["FileType"],
- "File type should not match for %s:%s"%(pkg.name, filepath))
- self.assertEqual(pywbem.Uint16(3), inst["FileType"],
- "File type should match for %s:%s"%(pkg.name, filepath))
-
- self.assertTrue(cur_pflags.exists,
- "Regular file %s:%s should exist"%(pkg.name, filepath))
- self.assertFalse(cur_pflags.type,
- "Regular file type should not match for %s:%s"%(
- pkg.name, filepath))
- self.assertFalse(cur_pflags.size,
- "Size should not match for regular file %s:%s"%(
- pkg.name, filepath))
- self.assertFalse(cur_pflags.mode,
- "File mode should not match for regular file %s:%s"%(
- pkg.name, filepath))
- self.assertFalse(cur_pflags.checksum,
- "Checksum should not match for regular file %s:%s"%(
- pkg.name, filepath))
- self.assertTrue(cur_pflags.dev,
- "Device should match for regular file %s:%s"%(
- pkg.name, filepath))
- self.assertFalse(cur_pflags.ltarget,
- "Link target should not match for regular file %s:%s"%(
- pkg.name, filepath))
- self.assertTrue(cur_pflags.uid,
- "Regular file's uid should match for %s:%s"%(
- pkg.name, filepath))
- self.assertTrue(cur_pflags.gid,
- "Regular file's gid should match for %s:%s"%(
- pkg.name, filepath))
- self.assertFalse(cur_pflags.mtime,
- "Regular file's mtime should not match for %s:%s"%(
- pkg.name, filepath))
-
- # remove it
- os.remove(filepath)
- inst = self.conn.GetInstance(InstanceName=inst.path)
- cur_pflags = PassedFlags(*inst["PassedFlags"])
- self.assertEqual(inst["ExpectedLinkTarget"], inst["LinkTarget"],
- "Link target does not match for regular file %s:%s"%(
- pkg.name, filepath))
- self.assertNotEqual(inst["ExpectedFileSize"], inst["FileSize"],
- "File size should not match for regular file %s:%s"%(
- pkg.name, filepath))
- self.assertIsNone(inst["LastModificationTime"])
- self.assertIsNone(inst["FileType"])
- self.assertIsNone(inst["FileChecksum"])
- self.assertIsNone(inst["FileMode"])
- self.assertIsNone(inst["FileUserID"])
- self.assertIsNone(inst["FileGroupID"])
-
- self.assertFalse(cur_pflags.exists,
- "Regular file %s:%s should not exist"%(pkg.name, filepath))
- self.assertFalse(cur_pflags.type,
- "Regular file's type should not match for %s:%s"%(
- pkg.name, filepath))
- self.assertFalse(cur_pflags.size,
- "Regular file's size should not match for %s:%s"%(
- pkg.name, filepath))
- self.assertFalse(cur_pflags.mode,
- "Regular file's mode should not match for %s:%s"%(
- pkg.name, filepath))
- self.assertFalse(cur_pflags.checksum,
- "Regular file's checksum should not match for %s:%s"%(
- pkg.name, filepath))
- self.assertFalse(cur_pflags.dev,
- "Regular file's dev number should not match %s:%s"%(
- pkg.name, filepath))
- self.assertFalse(cur_pflags.ltarget,
- "Regular file's link target should not match for %s:%s"%(
- pkg.name, filepath))
- self.assertFalse(cur_pflags.uid,
- "Regular file's uid should not match for %s:%s"%(
- pkg.name, filepath))
- self.assertFalse(cur_pflags.gid,
- "Regular file's guid should not match for %s:%s"%(
- pkg.name, filepath))
- self.assertFalse(cur_pflags.mtime,
- "Regular file's mtime should not match for %s:%s"%(
- pkg.name, filepath))
-
- @common.mark_dangerous
- def test_get_instance(self):
- """
- Tests GetInstance call.
- """
- for pkg in self.pkgdb:
- files = self.pkg_files[pkg.name]
- if ( common.is_installed(pkg)
- and not common.verify_pkg(pkg.name)):
- common.remove_pkg(pkg.name)
- if not common.is_installed(pkg.name):
- self.install_pkg(pkg)
- self.assertTrue(common.is_installed(pkg),
- "Package %s must be installed"%pkg)
-
- for filepath in files:
- objpath = self.make_op(pkg, filepath)
-
- inst = self.conn.GetInstance(
- InstanceName=objpath,
- LocalOnly=False)
- self.assertIsInstance(inst, pywbem.CIMInstance)
- self.assertEqual(objpath, inst.path,
- msg="Object paths of instance must match for %s:%s"%(
- pkg.name, filepath))
- for key in self.KEYS:
- if key.lower() == "softwareelementid":
- self.assertEqualSEID(inst[key], objpath[key],
- "OP key %s values should match for %s:%s"%(
- key, pkg.name, filepath))
- elif key.lower() == "targetoperatingsystem":
- self.assertIsInstance(objpath[key], (int, long))
- else:
- self.assertEqual(objpath[key], inst[key],
- "OP key %s values should match for %s:%s"%(
- key, pkg.name, filepath))
-
- self.assertTrue(inst["FileExists"],
- "File %s:%s must exist"%(pkg.name, filepath))
- self.assertEqual(10, len(inst["PassedFlags"]),
- "PassedFlags must have constant length")
- for i, flag in enumerate(inst["PassedFlags"]):
- self.assertTrue(flag is True,
- "Flag \"%s\" should all match for file %s:%s"%(
- inst["PassedFlagsDescriptions"][i], pkg.name, filepath))
- for prop in ( "FileType", "FileUserID", "FileGroupID"
- , "FileMode", "FileSize", "LinkTarget"
- , "FileChecksum", "FileModeFlags"):
- if ( ( os.path.islink(filepath)
- or (not os.path.isfile(filepath)))
- and prop == "FileSize"):
- continue
- self.assertEqual(inst["Expected"+prop], inst[prop],
- "%s should match for %s:%s"%(prop, pkg.name, filepath))
- if os.path.islink(filepath):
- self.do_check_symlink(pkg, filepath, inst)
- elif os.path.isdir(filepath):
- self.do_check_directory(pkg, filepath, inst)
- elif os.path.isfile(filepath):
- self.do_check_file(pkg, filepath, inst)
-
- @common.mark_dangerous
- def test_invoke_method(self):
- """
- Tests Invoke method invocation.
- """
- for pkg in self.pkgdb:
- files = self.pkg_files[pkg.name]
- if common.is_installed(pkg) and not common.verify_pkg(pkg.name):
- common.remove_pkg(pkg.name)
- if not common.is_installed(pkg.name):
- self.install_pkg(pkg)
- self.assertTrue(common.is_installed(pkg),
- "Package %s must be installed"%pkg)
- for filepath in files:
- objpath = self.make_op(pkg, filepath)
-
- (rval, _) = self.conn.InvokeMethod(
- MethodName="Invoke",
- ObjectName=objpath)
- self.assertEqual(pywbem.Uint32(0), rval,
- msg="InvokeMethod should be successful for %s:%s"%(
- pkg.name, filepath))
-
- # modify file
- if os.path.isfile(filepath):
- os.remove(filepath)
- else:
- os.lchown(filepath, os.stat(filepath).st_uid + 1, -1)
- (rval, _) = self.conn.InvokeMethod(
- MethodName="Invoke",
- ObjectName=objpath)
- self.assertEqual(pywbem.Uint32(2), rval,
- "InvokeMethod should not pass for modified file %s:%s"%(
- pkg.name, filepath))
-
-def suite():
- """For unittest loaders."""
- return unittest.TestLoader().loadTestsFromTestCase(
- TestSoftwareFileCheck)
-
-if __name__ == "__main__":
- unittest.main()
diff --git a/src/software/test/test_software_identity.py b/src/software/test/test_software_identity.py
new file mode 100755
index 0000000..934d4c9
--- /dev/null
+++ b/src/software/test/test_software_identity.py
@@ -0,0 +1,154 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Michal Minar <miminar@redhat.com>
+#
+"""
+Unit tests for LMI_SoftwareIdentity provider.
+"""
+
+from datetime import datetime, timedelta
+import itertools
+import pywbem
+import unittest
+
+import base
+import rpmcache
+import util
+
+class TestSoftwareIdentity(base.SoftwareBaseTestCase): #pylint: disable=R0904
+ """
+ Basic cim operations test.
+ """
+
+ CLASS_NAME = "LMI_SoftwareIdentity"
+ KEYS = ("InstanceID", )
+
+ def make_op(self, pkg, newer=True):
+ """
+ @param ses SoftwareElementState property value
+ @return object path of SoftwareIdentity
+ """
+ objpath = self.objpath.copy()
+ objpath["InstanceID"] = 'LMI:PKG:'+pkg.get_nevra(newer, "ALWAYS")
+ return objpath
+
+ @base.mark_dangerous
+ def test_get_instance(self):
+ """
+ Dangerous test, making sure, that properties are set correctly
+ for installed and removed packages.
+ """
+ for pkg in self.dangerous_pkgs:
+ if rpmcache.is_pkg_installed(pkg.name):
+ rpmcache.remove_pkg(pkg.name)
+ objpath = self.make_op(pkg)
+ inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False)
+ self.assertIsNone(inst["InstallDate"])
+ time_stamp = datetime.now(pywbem.cim_types.MinutesFromUTC(0)) \
+ - timedelta(seconds=0.01)
+ rpmcache.install_pkg(pkg)
+ inst2 = self.conn.GetInstance(
+ InstanceName=objpath, LocalOnly=False)
+ self.assertIsNotNone(inst2["InstallDate"])
+ self.assertGreater(inst2["InstallDate"].datetime, time_stamp)
+
+ def test_get_instance_safe(self):
+ """
+ Tests GetInstance call on packages from our rpm cache.
+ """
+ for pkg in self.safe_pkgs:
+ objpath = self.make_op(pkg)
+ inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False)
+ self.assertEqual(inst.path, objpath,
+ "Object paths should match for package %s"%pkg)
+ for key in self.KEYS:
+ self.assertTrue(inst.properties.has_key(key),
+ "OP is missing \"%s\" key for package %s"%(key, pkg))
+ self.assertIsInstance(inst["Caption"], basestring)
+ self.assertIsInstance(inst["Description"], basestring)
+ self.assertEqual(pkg.up_evra, inst["VersionString"],
+ "VersionString does not match evra for pkg %s" % pkg)
+ self.assertTrue(inst['IsEntity'])
+ self.assertEqual(pkg.name, inst["Name"],
+ "Name does not match for pkg %s" % pkg)
+ self.assertIsInstance(inst["Epoch"], pywbem.Uint32,
+ "Epoch does not match for pkg %s" % pkg)
+ self.assertEqual(int(pkg.epoch), inst["Epoch"])
+ self.assertEqual(pkg.ver, inst["Version"],
+ "Version does not match for pkg %s" % pkg)
+ self.assertEqual(pkg.rel, inst["Release"],
+ "Release does not match for pkg %s" % pkg)
+ self.assertEqual(pkg.arch, inst["Architecture"],
+ "Architecture does not match for pkg %s" % pkg)
+
+ # try to leave out epoch part
+ if pkg.epoch == "0":
+ op_no_epoch = objpath.copy()
+ op_no_epoch["SoftwareElementID"] = util.make_nevra(pkg.name,
+ pkg.up_epoch, pkg.up_ver, pkg.up_rel, pkg.arch, "NEVER")
+ self.assertNotIn(":", op_no_epoch["SoftwareElementID"])
+ inst = self.conn.GetInstance(
+ InstanceName=op_no_epoch, LocalOnly=False)
+ self.assertIn(inst.path, (objpath, op_no_epoch))
+
+ @base.mark_tedious
+ def test_enum_instance_names_safe(self):
+ """
+ Tests EnumInstanceNames call on installed packages.
+ """
+ inames = self.conn.EnumerateInstanceNames(ClassName=self.CLASS_NAME)
+ self.assertGreater(len(inames), 0)
+ for iname in inames:
+ self.assertIsInstance(iname, pywbem.CIMInstanceName)
+ self.assertEqual(iname.namespace, 'root/cimv2')
+ self.assertEqual(sorted(iname.keys()), sorted(self.KEYS))
+ nevra_set = set(i["InstanceID"] for i in inames)
+ for pkg in self.safe_pkgs:
+ self.assertIn('LMI:PKG:'+pkg.get_nevra(with_epoch="ALWAYS"),
+ nevra_set)
+
+# @base.mark_tedious
+# def test_enum_instances(self):
+# """
+# Tests EnumInstances call on installed packages.
+# """
+# insts = self.conn.EnumerateInstances(ClassName=self.CLASS_NAME)
+# self.assertGreater(len(insts), 0)
+# for inst in insts:
+# self.assertIsInstance(inst, pywbem.CIMInstance)
+# self.assertEqual(inst.namespace, 'root/cimv2')
+# self.assertEqual(sorted(inst.keys()), sorted(self.KEYS))
+# self.assertEqual(inst["InstanceID"], inst.path["InstanceID"])
+# nevra_set = set()
+# name_set = set()
+# for inst in insts:
+# nevra_set.add(inst["InstanceID"])
+# name_set.add(inst["Name"])
+# for pkg in self.safe_pkgs:
+# self.assertIn("LMI:PKG:"+pkg.get_nevra(with_epoch="ALWAYS"),
+# nevra_set)
+# self.assertIn(pkg.name, name_set)
+
+def suite():
+ """For unittest loaders."""
+ return unittest.TestLoader().loadTestsFromTestCase(
+ TestSoftwareIdentity)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/src/software/test/test_software_installed_package.py b/src/software/test/test_software_installed_package.py
deleted file mode 100755
index 93669f9..0000000
--- a/src/software/test/test_software_installed_package.py
+++ /dev/null
@@ -1,351 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-#
-# Authors: Michal Minar <miminar@redhat.com>
-#
-"""
-Unit tests for LMI_SoftwareInstalledPackage provider.
-"""
-
-import os
-import pywbem
-import socket
-import stat
-import unittest
-
-import common
-
-def make_path_tuple(objpath):
- return tuple(objpath[a] for a in ("SoftwareElementID", "Name", "Version",
- "SoftwareElementState"))
-
-class TestSoftwareInstalledPackage(common.SoftwareBaseTestCase):
- """
- Basic cim operations test.
- """
-
- CLASS_NAME = "LMI_SoftwareInstalledPackage"
- KEYS = ("Software", "System")
-
- def make_op(self, pkg, newer=True, ses=2):
- """
- @param ses SoftwareElementState property value
- @return object path of LMI_SoftwareInstaledPackage
- """
- objpath = self.objpath.copy()
- pkg_op = pywbem.CIMInstanceName(
- classname="LMI_SoftwarePackage", namespace="root/cimv2",
- keybindings={
- "Name" : pkg.name,
- "SoftwareElementID" : pkg.get_nevra(newer, 'ALWAYS'),
- "SoftwareElementState" : pywbem.Uint16(ses),
- "TargetOperatingSystem" : pywbem.Uint16(36),
- "Version" : getattr(pkg, 'up_ver' if newer else 'ver') })
- system_op = pywbem.CIMInstanceName(
- classname="Linux_ComputerSystem", namespace="root/cimv2",
- keybindings={
- "CreationClassName" : "Linux_ComputerSystem",
- "Name" : socket.gethostname() })
- objpath["Software"] = pkg_op
- objpath["System"] = system_op
- return objpath
-
- @common.mark_dangerous
- def test_get_instance(self):
- """
- Tests GetInstance call on packages from our rpm cache.
- TODO: test this in non-dangerous way
- """
- for pkg in self.pkgdb:
- if not common.is_installed(pkg):
- self.install_pkg(pkg)
- objpath = self.make_op(pkg)
- inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False)
- self.assertIsSubclass(inst.path.classname, self.CLASS_NAME)
- self.assertIsSubclass(
- inst.path['System'].classname, objpath['System'].classname)
- self.assertIsSubclass(inst.path['Software'].classname,
- objpath['Software'].classname)
- self.assertEqual(2, len(inst.path.keys()))
- for key in self.KEYS:
- self.assertEqual(inst[key], inst.path[key],
- "Object paths should be the same for %s"%pkg)
- self.assertEqual(inst['Software'], objpath['Software'],
- "Software key reference should match for %s"%pkg)
- common.remove_pkg(pkg.name)
- objpath['Software']['SoftwareElementState'] = pywbem.Uint16(1)
- with self.assertRaises(pywbem.CIMError) as cmngr:
- self.conn.GetInstance(InstanceName=objpath, LocalOnly=False)
- self.assertEqual(pywbem.CIM_ERR_NOT_FOUND, cmngr.exception.args[0],
- "Package %s should not be installed"%pkg)
-
- @common.mark_dangerous
- def test_enum_instance_names(self):
- """
- Tests EnumInstances call.
- TODO: test this in non-dangerous way
- """
- pkg = self.pkgdb[0] if len(self.pkgdb) > 0 else None
- if pkg and common.is_installed(pkg.name):
- common.remove_pkg(pkg.name)
- insts1 = self.conn.EnumerateInstanceNames(ClassName=self.CLASS_NAME)
- if pkg:
- self.install_pkg(pkg)
- insts2 = self.conn.EnumerateInstanceNames(ClassName=self.CLASS_NAME)
- self.assertEqual(len(insts1) + 1, len(insts2))
-
- if pkg:
- objpath = self.make_op(pkg)
- self.assertIn(make_path_tuple(objpath['Software']),
- set(make_path_tuple(inst['Software']) for inst in insts2))
- self.assertTrue(all(isinstance(inst, pywbem.CIMInstanceName)
- for inst in insts1))
-
- @common.mark_dangerous
- def test_enum_instances(self):
- """
- Tests EnumInstances call.
- """
- pkg = self.pkgdb[0] if len(self.pkgdb) > 0 else None
- if pkg and not common.is_installed(pkg.name):
- self.install_pkg(pkg.name)
- insts1 = self.conn.EnumerateInstances(ClassName=self.CLASS_NAME)
- if pkg:
- common.remove_pkg(pkg.name)
- insts2 = self.conn.EnumerateInstances(ClassName=self.CLASS_NAME)
- objpath = self.make_op(pkg)
- self.assertEqual(len(insts2) + 1, len(insts1))
- path_values = set(make_path_tuple(p["Software"]) for p in insts1)
- self.assertIn(make_path_tuple(objpath['Software']), path_values)
- path_values = set(make_path_tuple(p["Software"]) for p in insts2)
- self.assertNotIn(make_path_tuple(objpath['Software']), path_values)
-
- self.assertTrue(all(inst['Software'] == inst.path['Software']
- for inst in insts1))
- self.assertTrue(all(inst['System'] == inst.path['System']
- for inst in insts1))
-
- @common.mark_dangerous
- def test_create_instance(self):
- """
- Tests CreateInstance call.
- """
- for pkg in self.pkgdb:
- if common.is_installed(pkg.name):
- common.remove_pkg(pkg.name)
- self.assertFalse(common.is_installed(pkg.name),
- "Package %s should not be installed"%pkg)
- objpath = self.make_op(pkg, ses=1)
- inst = pywbem.CIMInstance(classname=objpath.classname, path=objpath)
- inst["Software"] = objpath["Software"]
- inst["System"] = objpath["System"]
- iname = self.conn.CreateInstance(NewInstance=inst)
- self.assertIsInstance(iname, pywbem.CIMInstanceName)
- objpath["Software"]["SoftwareElementState"] = pywbem.Uint16(2)
- self.assertEqual(iname["Software"], objpath["Software"],
- "Software key reference should match for %s"%pkg)
- self.assertIsInstance(iname["System"], pywbem.CIMInstanceName)
- self.assertIsSubclass(iname["System"].classname,
- objpath["System"].classname)
- self.assertEqual(set(iname.keys()), set(objpath.keys()),
- "Keys of object paths should be the same for %s"%pkg)
- self.assertTrue(common.is_installed(pkg.name),
- "Package %s should be installed"%pkg)
-
- with self.assertRaises(pywbem.CIMError) as cmngr:
- self.conn.CreateInstance(NewInstance=inst)
- self.assertEqual(pywbem.CIM_ERR_ALREADY_EXISTS,
- cmngr.exception.args[0],
- "Package %s should already be installed"%pkg)
-
- @common.mark_dangerous
- def test_delete_instance(self):
- """
- Tests DeleteInstance call.
- """
- for pkg in self.pkgdb:
- if not common.is_installed(pkg.name):
- self.install_pkg(pkg)
- self.assertTrue(common.is_installed(pkg.name),
- "Package %s must be installed"%pkg)
- objpath = self.make_op(pkg)
- self.conn.DeleteInstance(objpath)
- self.assertFalse(common.is_installed(pkg.name),
- "Package %s must be uninstalled"%pkg)
- with self.assertRaises(pywbem.CIMError) as cmngr:
- self.conn.DeleteInstance(objpath)
- self.assertIn(cmngr.exception.args[0],
- [pywbem.CIM_ERR_FAILED, pywbem.CIM_ERR_NOT_FOUND],
- "Package %s can not be uninstalled again"%pkg)
-
- @common.mark_dangerous
- def test_check_integrity(self):
- """
- Tests CheckIntegrity call.
- TODO: test this in non-dangerous way
- """
- for pkg in self.pkgdb:
- files = self.pkg_files[pkg.name]
- if ( ( common.is_installed(pkg)
- and not common.verify_pkg(pkg.name))
- or ( common.is_installed(pkg.name)
- and not common.is_installed(pkg))) :
- common.remove_pkg(pkg.name)
- if not common.is_installed(pkg.name):
- self.install_pkg(pkg)
- self.assertTrue(common.is_installed(pkg),
- "Package %s must be installed"%pkg)
-
- objpath = self.make_op(pkg)
- (rval, oparms) = self.conn.InvokeMethod(
- MethodName="CheckIntegrity",
- ObjectName=objpath)
- self.assertEqual(pywbem.Uint32(0), rval,
- "IntegrityCheck should pass for %s"%pkg.name) # check passed
- self.assertEqual(1, len(oparms))
- self.assertIn("Failed", oparms)
- self.assertEqual(0, len(oparms["Failed"]),
- "IntegrityCheck should not fail for %s"%pkg.name)
-
- cnt_bad = 0
- for file_path in files:
- stats = os.lstat(file_path)
- if os.path.islink(file_path): # modify symbolic link
- target = os.readlink(file_path)
- os.remove(file_path)
- os.symlink(target, file_path) # just touch symlink
- (rval, oparms) = self.conn.InvokeMethod(
- MethodName="CheckIntegrity",
- ObjectName=objpath)
- # symlink must pass
- self.assertEqual(cnt_bad, len(oparms["Failed"]),
- "Symlink %s:%s should pass"%(pkg.name, file_path))
- os.remove(file_path)
- # now change target
- os.symlink("wrong_link_target", file_path)
- elif os.path.isdir(file_path): # check directory
- os.chmod(file_path, stats.st_mode) # just touch dir
- (rval, oparms) = self.conn.InvokeMethod(
- MethodName="CheckIntegrity",
- ObjectName=objpath)
- # dir must pass
- self.assertEqual(cnt_bad, len(oparms["Failed"]),
- "Directory %s:%s should pass"%(pkg.name, file_path))
- # modify read access of directory
- os.chmod(file_path, stats.st_mode ^ stat.S_IROTH)
- else: # modify regular file
- # just touch file - this is enough to make it fail
- with open(file_path, "w+"):
- pass
- cnt_bad += 1
- (rval, oparms) = self.conn.InvokeMethod(
- MethodName="CheckIntegrity",
- ObjectName=objpath)
- self.assertEqual(pywbem.Uint32(1), rval,
- "File %s:%s should not pass"%(pkg.name, file_path))
- self.assertEqual(1, len(oparms))
- self.assertIn("Failed", oparms)
- self.assertEqual(len(oparms["Failed"]), cnt_bad,
- "Number of errors not correct. Failed for %s:%s" % (
- pkg.name, file_path))
- self.assertIn(file_path, (p["Name"] for p in oparms["Failed"]),
- "File %s:%s should also be in failed"%(pkg.name, file_path))
-
- @common.mark_dangerous
- def test_method_update(self):
- """
- Tests Update method invocation.
- """
- for pkg in self.pkgdb:
- if ( common.is_installed(pkg.name)
- and not common.is_installed(pkg, False)):
- common.remove_pkg(pkg.name)
- if not common.is_installed(pkg.name):
- self.install_pkg(pkg, False)
- self.assertTrue(common.is_installed(pkg, False),
- "Package %s must be installed"%pkg.get_nevra(False))
-
- objpath = self.make_op(pkg, False)
- op_up = self.make_op(pkg)
- (rval, oparms) = self.conn.InvokeMethod(
- MethodName="Update",
- ObjectName=objpath)
- self.assertEqual(pywbem.Uint16(1), rval,
- "Update should succed for %s"%pkg.get_nevra(False))
- self.assertEqual(1, len(oparms))
- self.assertIn("Installed", oparms)
- self.assertEqual(oparms["Installed"], op_up["Software"],
- "Object paths should match for %s"%pkg)
- self.assertTrue(common.is_installed(pkg),
- "Package %s must be installed"%pkg)
-
- (rval, oparms) = self.conn.InvokeMethod(
- MethodName="Update",
- ObjectName=op_up)
- self.assertEqual(pywbem.Uint16(0), rval,
- "Package %s should be already updated"%pkg)
- self.assertEqual(1, len(oparms))
- self.assertEqual(oparms["Installed"], op_up["Software"],
- "Object paths should match for %s"%pkg)
- self.assertTrue(common.is_installed(pkg.name),
- "Package %s must be installed"%pkg)
- self.assertFalse(common.is_installed(pkg, False),
- "Older package %s must not be installed" %
- pkg.get_nevra(False))
-
- with self.assertRaises(pywbem.CIMError) as cmngr:
- self.conn.InvokeMethod(
- MethodName="Update",
- ObjectName=objpath)
- self.assertEqual(pywbem.CIM_ERR_NOT_FOUND, cmngr.exception.args[0],
- "Older package %s should not be installed" %
- pkg.get_nevra(False))
-
- common.remove_pkg(pkg.name)
- self.install_pkg(pkg, False)
- self.assertTrue(common.is_installed(pkg, False))
-
- (rval, oparms) = self.conn.InvokeMethod(
- MethodName="Update",
- ObjectName=objpath,
- Epoch=pkg.epoch,
- Version=pkg.ver,
- Release=pkg.rel)
- self.assertEqual(pywbem.Uint16(0), rval,
- "Update of package %s should succeed"%pkg)
- self.assertEqual(oparms["Installed"], objpath["Software"],
- "Object paths should be the same for package %s"%pkg)
-
- (rval, oparms) = self.conn.InvokeMethod(
- MethodName="Update",
- ObjectName=objpath,
- Epoch=pkg.up_epoch,
- Version=pkg.up_ver,
- Release=pkg.up_rel)
- self.assertEqual(pywbem.Uint16(1), rval,
- "Package %s can not be updated twice to highest version"%pkg)
- self.assertEqual(oparms["Installed"], op_up["Software"],
- "Object paths should match for package %s"%pkg)
-
-def suite():
- """For unittest loaders."""
- return unittest.TestLoader().loadTestsFromTestCase(
- TestSoftwareInstalledPackage)
-
-if __name__ == "__main__":
- unittest.main()
diff --git a/src/software/test/test_software_package.py b/src/software/test/test_software_package.py
deleted file mode 100755
index 4cd5da9..0000000
--- a/src/software/test/test_software_package.py
+++ /dev/null
@@ -1,153 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-#
-# Authors: Michal Minar <miminar@redhat.com>
-#
-"""
-Unit tests for LMI_SoftwareInstalledPackage provider.
-"""
-
-import pywbem
-import unittest
-
-import common
-import rpmcache
-
-class TestSoftwarePackage(common.SoftwareBaseTestCase): #pylint: disable=R0904
- """
- Basic cim operations test.
- """
-
- CLASS_NAME = "LMI_SoftwarePackage"
- KEYS = ( "Name", "SoftwareElementID", "SoftwareElementState"
- , "TargetOperatingSystem", "Version")
-
- def make_op(self, pkg, newer=True, ses=2):
- """
- @param ses SoftwareElementState property value
- @return object path of SoftwarePackage
- """
- objpath = self.objpath.copy()
- objpath["Name"] = pkg.name
- objpath["SoftwareElementID"] = pkg.get_nevra(newer, "ALWAYS")
- objpath["SoftwareElementState"] = pywbem.Uint16(ses)
- objpath["TargetOperatingSystem"] = pywbem.Uint16(36)
- objpath["Version"] = getattr(pkg, 'up_ver' if newer else 'ver')
- return objpath
-
- @common.mark_dangerous
- def test_get_instance(self):
- """
- Tests GetInstance call on packages from our rpm cache.
- TODO: test this in non-dangerous way
- """
- for pkg in self.pkgdb:
- if common.is_installed(pkg.name):
- common.remove_pkg(pkg.name)
- objpath = self.make_op(pkg, ses=1)
- inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False)
- self.assertEqual(inst.path, objpath,
- "Object paths should match for %s"%pkg)
- for key in self.KEYS:
- self.assertTrue(inst.properties.has_key(key),
- "OP is missing \"%s\" key for package %s"%(key, pkg))
- if key == "TargetOperatingSystem":
- self.assertIsInstance(inst.path[key], (int, long))
- else:
- self.assertEqual(inst.path[key], inst[key],
- "Object paths of instance should match for %s"%pkg)
- self.assertEqual(pkg.up_rel, inst['Release'],
- "Release property should match for %s"%pkg)
- self.install_pkg(pkg)
- objpath['SoftwareElementState'] = pywbem.Uint16(2)
- inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False)
- self.assertEqual(inst.path, objpath,
- "Object paths should match for %s"%pkg)
-
- # try to leave out epoch part
- if pkg.epoch == "0":
- op_no_epoch = objpath.copy()
- op_no_epoch["SoftwareElementID"] = rpmcache.make_nevra(pkg.name,
- pkg.up_epoch, pkg.up_ver, pkg.up_rel, pkg.arch, "NEVER")
- self.assertNotIn(":", op_no_epoch["SoftwareElementID"])
- inst = self.conn.GetInstance(
- InstanceName=op_no_epoch, LocalOnly=False)
- self.assertIn(inst.path, (objpath, op_no_epoch))
-
- @common.mark_dangerous
- def test_method_install(self):
- """
- Tests Install method invocation.
- """
- for pkg in self.pkgdb:
- if common.is_installed(pkg.name):
- common.remove_pkg(pkg.name)
- objpath = self.make_op(pkg, ses=1)
- (rval, oparms) = self.conn.InvokeMethod(
- MethodName="Install",
- ObjectName=objpath)
- self.assertEqual(pywbem.Uint32(1), rval,
- "Installation of %s should be successful"%pkg)
- self.assertEqual(1, len(oparms))
- self.assertTrue(oparms.has_key('Installed'))
- objpath['SoftwareElementState'] = pywbem.Uint16(2)
- self.assertEqual(oparms['Installed'], objpath,
- "Object paths should match for %s"%pkg)
- self.assertTrue(common.is_installed(pkg.name),
- "Package %s must be installed"%pkg)
- (rval, oparms) = self.conn.InvokeMethod(
- MethodName="Install",
- ObjectName=objpath)
- self.assertEqual(pywbem.Uint32(0), rval,
- "Installation of %s should fail"%pkg)
- self.assertEqual(len(oparms), 1)
- self.assertEqual(oparms['Installed'], objpath,
- "Object paths should match for %s"%pkg)
-
- @common.mark_dangerous
- def test_method_remove(self):
- """
- Tests Remove method invocation.
- """
- for pkg in self.pkgdb:
- if not common.is_installed(pkg.name):
- self.install_pkg(pkg)
- objpath = self.make_op(pkg)
- (rval, oparms) = self.conn.InvokeMethod(
- MethodName="Remove",
- ObjectName=objpath)
- self.assertEqual(pywbem.Uint16(1), rval,
- "Removal of package should succeed"%pkg)
- self.assertEqual(0, len(oparms))
- self.assertFalse(common.is_installed(pkg.name),
- "Package %s should not be installed"%pkg)
- objpath["SoftwareElementState"] = pywbem.Uint16(1)
- (rval, oparms) = self.conn.InvokeMethod(
- MethodName="Remove",
- ObjectName=objpath)
- self.assertEqual(pywbem.Uint32(0), rval,
- "Removal of %s should fail"%pkg)
- self.assertEqual(len(oparms), 0)
-
-def suite():
- """For unittest loaders."""
- return unittest.TestLoader().loadTestsFromTestCase(
- TestSoftwarePackage)
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/src/software/test/test_system_software_collection.py b/src/software/test/test_system_software_collection.py
new file mode 100755
index 0000000..193a4d5
--- /dev/null
+++ b/src/software/test/test_system_software_collection.py
@@ -0,0 +1,86 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Michal Minar <miminar@redhat.com>
+#
+"""
+Unit tests for LMI_SoftwareIdentity provider.
+"""
+
+import unittest
+
+import base
+
+class TestSystemSoftwareCollection(
+ base.SoftwareBaseTestCase): #pylint: disable=R0904
+ """
+ Basic cim operations test.
+ """
+
+ CLASS_NAME = "LMI_SystemSoftwareCollection"
+ KEYS = ("InstanceID", )
+
+ def make_op(self):
+ """
+ @param ses SoftwareElementState property value
+ @return object path of SoftwareIdentity
+ """
+ objpath = self.objpath.copy()
+ objpath["InstanceID"] = "LMI:SystemSoftwareCollection"
+ return objpath
+
+ def test_get_instance(self):
+ """
+ Tests GetInstance call on packages from our rpm cache.
+ """
+ objpath = self.make_op()
+ inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False)
+
+ self.assertEqual(objpath, inst.path)
+ self.assertEqual(list(objpath.keys()), list(inst.path.keys()))
+ for key in self.KEYS:
+ self.assertEqual(inst[key], objpath[key])
+ self.assertIsInstance(inst["Caption"], basestring)
+
+ def test_enum_instance_names(self):
+ """
+ Tests EnumInstanceNames call on installed packages.
+ """
+ inames = self.conn.EnumerateInstanceNames(ClassName=self.CLASS_NAME)
+ self.assertEqual(len(inames), 1)
+ iname = inames[0]
+ self.assertEqual(iname, self.make_op())
+
+ def test_enum_instances(self):
+ """
+ Tests EnumInstances call on installed packages.
+ """
+ insts = self.conn.EnumerateInstances(ClassName=self.CLASS_NAME)
+ self.assertEqual(len(insts), 1)
+ inst = insts[0]
+ self.assertEqual(inst.path, self.make_op())
+ self.assertEqual(inst["InstanceID"], inst.path["InstanceID"])
+ self.assertIsInstance(inst["Caption"], basestring)
+
+def suite():
+ """For unittest loaders."""
+ return unittest.TestLoader().loadTestsFromTestCase(
+ TestSystemSoftwareCollection)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/src/software/test/util.py b/src/software/test/util.py
new file mode 100644
index 0000000..da55e6a
--- /dev/null
+++ b/src/software/test/util.py
@@ -0,0 +1,82 @@
+#!/usr/bin/python
+# -*- Coding:utf-8 -*-
+#
+# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details. #
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Radek Novacek <rnovacek@redhat.com>
+# Authors: Michal Minar <miminar@redhat.com>
+
+"""
+Common test utilities.
+"""
+
+import re
+from subprocess import check_output
+
+RE_NEVRA = re.compile(
+ r'^(?P<name>.+)-(?P<evra>(?P<epoch>\d+):(?P<ver>[^-]+)'
+ r'-(?P<rel>.+)\.(?P<arch>[^.]+))$')
+RE_NEVRA_OPT_EPOCH = re.compile(
+ r'^(?P<name>.+)-(?P<evra>((?P<epoch>\d+):)?(?P<ver>[^-]+)'
+ r'-(?P<rel>.+)\.(?P<arch>[^.]+))$')
+RE_ENVRA = re.compile(
+ r'^(?P<epoch>\d+|\(none\)):(?P<name>.+)-(?P<ver>[^-]+)'
+ r'-(?P<rel>.+)\.(?P<arch>[^.]+)$')
+
+def make_nevra(name, epoch, ver, rel, arch, with_epoch='NOT_ZERO'):
+ """
+ @param with_epoch may be one of:
+ "NOT_ZERO" - include epoch only if it's not zero
+ "ALWAYS" - include epoch always
+ "NEVER" - do not include epoch at all
+ """
+ estr = ''
+ if with_epoch.lower() == "always":
+ estr = epoch
+ elif with_epoch.lower() == "not_zero":
+ if epoch and epoch.lower() not in {"0", "(none)"}:
+ estr = epoch
+ if len(estr):
+ estr += ":"
+ return "%s-%s%s-%s.%s" % (name, estr, ver, rel, arch)
+
+def make_evra(epoch, ver, rel, arch):
+ """ @return evra string """
+ if not epoch or epoch.lower() == "(none)":
+ epoch = "0"
+ return "%s:%s-%s.%s" % (epoch, ver, rel, arch)
+
+def run_yum(*params, **kwargs):
+ """
+ Runs yum with params and returns its output
+ It's here especially to allow pass a repolist argument, that
+ specifies list of repositories, to run the command on.
+ """
+ cmd = ['yum'] + list(params)
+ repolist = kwargs.get('repolist', None)
+ if repolist is None:
+ repolist = []
+ if repolist:
+ cmd += ['--disablerepo=*']
+ cmd += ['--enablerepo='+r for r in repolist]
+ return check_output(cmd)
+
+def get_system_architecture():
+ """
+ @return the system architecture name as seen by rpm
+ """
+ return check_output(['rpm', '-q', '--qf', '%{ARCH}\n', 'rpm'])
+