summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--mof/LMI_Software.mof323
-rw-r--r--mof/LMI_Software.reg14
-rwxr-xr-xsrc/software/cli/software.py282
-rw-r--r--src/software/openlmi/software/LMI_HostedSoftwareCollection.py (renamed from src/software/openlmi/software/LMI_SoftwarePackageChecks.py)221
-rw-r--r--src/software/openlmi/software/LMI_MemberOfSoftwareCollection.py279
-rw-r--r--src/software/openlmi/software/LMI_SoftwareFileCheck.py247
-rw-r--r--src/software/openlmi/software/LMI_SoftwareIdentity.py190
-rw-r--r--src/software/openlmi/software/LMI_SoftwareInstalledPackage.py451
-rw-r--r--src/software/openlmi/software/LMI_SoftwarePackage.py258
-rw-r--r--src/software/openlmi/software/LMI_SystemSoftwareCollection.py174
-rw-r--r--src/software/openlmi/software/__init__.py5
-rw-r--r--src/software/openlmi/software/cimom_entry.py28
-rw-r--r--src/software/openlmi/software/core/ComputerSystem.py5
-rw-r--r--src/software/openlmi/software/core/SoftwareFileCheck.py515
-rw-r--r--src/software/openlmi/software/core/SoftwareIdentity.py278
-rw-r--r--src/software/openlmi/software/core/SoftwareInstalledPackage.py39
-rw-r--r--src/software/openlmi/software/core/SoftwarePackage.py412
-rw-r--r--src/software/openlmi/software/core/SoftwarePackageChecks.py32
-rw-r--r--src/software/openlmi/software/core/SystemSoftwareCollection.py63
-rw-r--r--src/software/openlmi/software/core/__init__.py6
-rw-r--r--src/software/openlmi/software/util/singletonmixin.py562
-rw-r--r--src/software/openlmi/software/yumdb/__init__.py8
-rw-r--r--src/software/openlmi/software/yumdb/jobs.py16
-rw-r--r--src/software/openlmi/software/yumdb/process.py22
-rw-r--r--src/software/test/base.py (renamed from src/software/test/common.py)153
-rw-r--r--src/software/test/package.py117
-rw-r--r--src/software/test/rpmcache.py422
-rwxr-xr-xsrc/software/test/run.py23
-rwxr-xr-xsrc/software/test/test_hosted_software_collection.py155
-rwxr-xr-xsrc/software/test/test_member_of_software_collection.py155
-rwxr-xr-xsrc/software/test/test_software_file_check.py527
-rwxr-xr-xsrc/software/test/test_software_identity.py154
-rwxr-xr-xsrc/software/test/test_software_installed_package.py351
-rwxr-xr-xsrc/software/test/test_software_package.py153
-rwxr-xr-xsrc/software/test/test_system_software_collection.py86
-rw-r--r--src/software/test/util.py82
36 files changed, 2709 insertions, 4099 deletions
diff --git a/mof/LMI_Software.mof b/mof/LMI_Software.mof
index fd24150..18d3035 100644
--- a/mof/LMI_Software.mof
+++ b/mof/LMI_Software.mof
@@ -21,295 +21,90 @@
#pragma locale ("en_US")
//#pragma namespace ("root/cimv2")
-[ Description (
- "RPM package installed on particular computer system with"
- " YUM (The Yellowdog Updated, Modified) package manager.")
-]
-class LMI_SoftwarePackage : CIM_SoftwareElement {
+class LMI_SoftwareIdentity : CIM_SoftwareIdentity {
- [ Description (
- "The release number (referred to in some older documentation as a"
- " \"vepoch\") is how the maintainer marks build revisions, starting"
- " from 1. When a minor change (spec file changed, patch added/removed)"
- " occurs, or a package is rebuilt to use newer headers or libraries,"
- " the release number should be incremented. If a major change"
- " (new version of the software being packaged) occurs, the version"
- " number should be changed to reflect the new software version,"
- " and the release number should be reset to 1.")
- ]
- string Release;
+ [Implemented(true), Override, Description(
+ "Unique identifier for installed or available package."
+ " It's composed of OrgID and LocalID separated by ':', where"
+ " <OrgID> is LMI and LocalID is PKG:<PKG_NEVRA>. <PKG_NEVRA>"
+ " is a string representing rpm package. Letters in NEVRA stand"
+ " for name, epoch, version, release and architecture.")]
+ string InstanceID;
- [ Description (
- "It is important to be careful with the version of post-release"
- " scheme, to ensure that package ordering is correct. It may be"
- " necessary to use Epoch to ensure that the current package is"
- " considered newer than the previous package.")
- ]
- uint16 Epoch;
+ [Implemented(true), Override, Description("Package's summary.")]
+ string Caption;
+
+ [Implemented(true), Override]
+ uint16 Classifications[];
- [ Description (
- "Architecture name, that package is compiled for. In case of no"
- "architecture dependency, this will contain \"noarch\".")
- ]
- string Architecture;
+ [Implemented(true), Override, Description("Package's description.")]
+ string Description;
- [ Description (
- "A software license is a legal instrument (usually by way of contract"
- " law) governing the usage or redistribution of software."
- " All software is copyright protected, except material in the public"
- " domain. Contractual confidentiality is another way of protecting"
- " software. A typical software license grants an end-user permission"
- " to use one or more copies of software in ways where such a use would"
- " otherwise potentially constitute copyright infringement of the"
- " software owner's exclusive rights under copyright law. List of"
- " possible values is based on list of licences approved by the"
- " Free Software Foundation and OSI. The string is obtained from RPM"
- " package." )
- ]
- string License;
+ [Implemented(true), Override, Description(
+ "Package's NEVRA string. That is also part of InstanceID.")]
+ string ElementName;
- [ Description (
- "Denotes the package group, in which the package belongs.")
- ]
- string Group;
+ [Implemented(true), Override]
+ datetime InstallDate;
- [ Description (
- "Size of RPM package in Bytes" ) ]
- uint64 Size;
+ [Implemented(true), Override]
+ boolean IsEntity;
- [ Description (
- "Will install available package."),
- ValueMap { "0", "1", "2" },
- Values { "Already installed", "Successful installation", "Failed" } ]
- uint32 Install(
- [ IN(false), OUT, Description (
- "The reference to newly installed package, if installation was"
- " successful. Reference to current package if it is already"
- " installed and Null otherwise.") ]
- LMI_SoftwareInstalledPackage REF Installed);
+ [Implemented(true), Override, Description(
+ "Name of package. This does not uniquely identify package"
+ " installed on computer system.")]
+ string Name;
- [ Description (
- "Will uninstall installed package."),
- ValueMap { "0", "1", "2" },
- Values { "Not installed", "Successful removal", "Failed" } ]
- uint32 Remove();
+ [Implemented(true), Override]
+ string TargetTypes[];
-};
-
-[ Description (
- "Identifies a file contained by RPM package. It's located"
- " in directory identified in FileName. The Invoke methods"
- " check for file existence and whether its attributes match"
- " those given by RPM package.")
-]
-class LMI_SoftwareFileCheck : CIM_FileSpecification {
-
- [ Key, Description (
- "Absolute path of file being checked.") ]
- string Name;
-
- [ Description (
- "Composition of RPM's name and absolute file path."
- " It's format is \"rpm_name#path\"") ]
- string CheckID;
-
- [ Override("SoftwareElementID"), Description (
- "RPM's nevra. This means: "
- "\"name-[epoch:]version-release.architecture\"."
- " If epoch part is missing, a \"0:\" is assumed." ) ]
- string SoftwareElementID;
-
- [ Override("Version"), Description (
- "Version of packaged software as listed by RPM package.") ]
- string Version;
-
- [ Description (
- "True, if file is present in file system.")]
- boolean FileExists;
-
- [ Override("FileSize"),
- Description("Size of installed file in Bytes.") ]
- uint64 FileSize;
- [ Units("KiloBytes"),
- Description("File size in Bytes from RPM database.") ]
- uint64 ExpectedFileSize;
-
- [ Description("File mode of installed file as a number.") ]
- uint32 FileMode;
- [ Description("File mode as a number given by RPM database.") ]
- uint32 ExpectedFileMode;
+ [Implemented(true), Override, Description(
+ "Package's EVRA, in format: "
+ "<epoch>:<version>-<release>.<architecture>")]
+ string VersionString;
- [ Description (
- "File mode of installed file as an array of permissions."
- " Each value represents a bit position in file mode."),
- ValueMap { "0", "1", "2" //XWR Other
- , "3", "4", "5" //XWR Group
- , "6", "7", "8" //XWR User
- , "9","10","11" //Sticky, SGID, SUID
- },
- Values { "Execute Other", "Write Other", "Read Other"
- , "Execute Group", "Write Group", "Read Group"
- , "Execute User" , "Write User" , "Read User"
- , "Sticky", "SGID", "SUID"
- } ]
- uint8 FileModeFlags[];
- [ Description (
- "File mode as an array of permissions of file from RPM database."
- " Each value represents a bit position in file mode."),
- ValueMap { "0", "1", "2" //XWR Other
- , "3", "4", "5" //XWR Group
- , "6", "7", "8" //XWR User
- , "9","10","11" //Sticky, SGID, SUID
- },
- Values { "Execute Other", "Write Other", "Read Other"
- , "Execute Group", "Write Group", "Read Group"
- , "Execute User" , "Write User" , "Read User"
- , "Sticky", "SGID", "SUID"
- } ]
- uint8 ExpectedFileModeFlags[];
+ [Implemented(true), Description("Package's epoch.")]
+ uint32 Epoch;
- [ Override("MD5Checksum"),
- Description("MD5 checksum of installed file.") ]
- string MD5Checksum;
- [ Description("Checksum of installed file."
- " Hash algorithm used can be obtained with ChecksumType property.") ]
- string FileChecksum;
- [ Description("Checksum of file from RPM database."
- " Hash algorithm used can be obtained with ChecksumType property.") ]
- string ExpectedFileChecksum;
+ [Implemented(true), Description("Package's version.")]
+ string Version;
- [ Description("Number of hash algorithm according to RFC4880."
- " This algorithm is used for making checksums of RPM files and"
- " packages. This algorithm is same for the whole RPM database."),
- ValueMap { "0", "1", "2", "3"
- , "8", "9","10","11" },
- Values { "UNKNOWN", "MD5", "SHA-1", "RIPE-MD/160"
- , "SHA256", "SHA384", "SHA512", "SHA224" } ]
- uint16 FileChecksumType;
+ [Implemented(true), Description("Package's release.")]
+ string Release;
- [ Description("User ID of installed file.") ]
- uint32 FileUserID;
- [ Description("User ID of file from RPM database.") ]
- uint32 ExpectedFileUserID;
+ [Implemented(true), Description("Package's architecture.")]
+ string Architecture;
- [ Description("Group ID of installed file.") ]
- uint32 FileGroupID;
- [ Description("Group ID of file from RPM database.") ]
- uint32 ExpectedFileGroupID;
-
- [ Description("Time of last modification of installed file as number a"
- " of secodns since the Epoch, 1970-01-01 00:00:00 +0000 (UTC).") ]
- uint64 LastModificationTime;
- [ Description("Time of last modification of file from RPM database as"
- " a number of secodns since the Epoch,"
- " 1970-01-01 00:00:00 +0000 (UTC).") ]
- uint64 ExpectedLastModificationTime;
+};
- [ Description("Target destination of symbolic link as returned by"
- " readline. If file is not a symbolic link, NULL is returned.") ]
- string LinkTarget;
- [ Description("Target destination of symbolic link from RPM database"
- " as returned by readline. If file is not a symbolic link,"
- " NULL is returned.") ]
- string ExpectedLinkTarget;
+class LMI_SystemSoftwareCollection : CIM_SystemSpecificCollection {
- [ Description("File type."),
- ValueMap { "0", "1", "2", "3"
- , "4", "5", "6" },
- Values { "Unknown", "File", "Directory", "Symlink"
- , "FIFO", "Character Device", "Block Device" } ]
- uint16 FileType;
- [ Description("File type of file in RPM database."),
- ValueMap { "0", "1", "2", "3"
- , "4", "5", "6" },
- Values { "Unknown", "File", "Directory", "Symlink"
- , "FIFO", "Character Device", "Block Device" } ]
- uint16 ExpectedFileType;
-
- [ Description (
- "Returns array of booleans for verification checks. Each value"
- " is either true or false, whether file passed the check or not."
- " True is present also if the check does not apply to the file"
- " (file implicitly passes the check, if check can not be"
- " performed)."
- " PassedFlagsDescriptions returns a description for each field in"
- " this array saying, what it means.") ]
- boolean PassedFlags[];
+ [Implemented(true), Override]
+ string InstanceID;
- [ Description (
- "Returns array of descriptions of checks corresponding to PassedFlags"
- " array.")]
- string PassedFlagsDescriptions[];
+ [Implemented(true), Override]
+ string Caption;
};
-// ===================================================================
-// Associations
-// ===================================================================
-[ Association ]
-class LMI_SoftwareInstalledPackage : CIM_InstalledSoftwareElement {
-
- [ Override("Software"), Weak, Description (
- "Reference to the installed RPM package.")]
- LMI_SoftwarePackage REF Software;
+[Association]
+class LMI_HostedSoftwareCollection : CIM_HostedCollection {
- [ Override("System"), Min(1), Description (
- "Reference to the ComputerSystem hosting a particular"
- " RPM package.")]
- CIM_ComputerSystem REF System;
-
- [ Description (
- "Updates the package to latest version."
- " When any of \"epoch\", \"version\" or \"release\" argument is given"
- ", install only matching available package. Otherwise try"
- " to update to newest package available."
- " If the \"epoch\", \"version\" and \"release\" of package is"
- " already installed, or no newer package is available,"
- " \"Already newest\" value is returned."),
- ValueMap { "0", "1", "2" },
- Values { "Already newest", "Successful installation", "Failed" } ]
- uint16 Update(
- [ IN(true), Description (
- "Specify particular epoch of package to update to."
- " Update to newest, when empty.") ]
- string Epoch,
- [ IN(true), Description (
- "Specify particular version of package to update to."
- " Update to newest, when empty") ]
- string Version,
- [ IN(true), Description (
- "Specify particular release of package to update to."
- " Update to newest, when empty.") ]
- string Release,
- [ IN(false), OUT, Description (
- "The reference to newly installed package, if installation was"
- " successful. Otherwise reference to current package.") ]
- LMI_SoftwareInstalledPackage REF Installed);
-
- [ Description (
- "Verify existence and attributes of files installed from"
- " RPM package. If all files installed exist and their attributes"
- " matches, method returns \"Pass\". \"Not passed\" is returned when"
- " arbitrary file differs in its attributes. And \"Error\" if"
- " verification could not be done."),
- ValueMap { "0", "1", "2" },
- Values { "Pass", "Not passed", "Error" } ]
- uint32 CheckIntegrity(
- [ IN(false), OUT, Description (
- "Array of references to LMI_SoftwareFileCheck, that did not pass"
- " verification.") ]
- LMI_SoftwareFileCheck REF Failed[]);
+ [Override("Antecedent")]
+ Linux_ComputerSystem REF Antecedent;
+ [Override("Dependent")]
+ LMI_SystemSoftwareCollection REF Dependent;
+
};
-[ Association, Aggregation ]
-class LMI_SoftwarePackageChecks : CIM_SoftwareElementChecks {
+[Association]
+class LMI_MemberOfSoftwareCollection : CIM_MemberOfCollection {
- [ Override("Element"), Description("The RPM package being checked") ]
- LMI_SoftwarePackage REF Element;
+ [Override("Collection")]
+ LMI_SystemSoftwareCollection REF Collection;
- [ Override("Check"), Description("The Check for the element.") ]
- LMI_SoftwareFileCheck REF Check;
+ [Override("Member")]
+ LMI_SoftwareIdentity REF Member;
};
-
diff --git a/mof/LMI_Software.reg b/mof/LMI_Software.reg
index 1c6f323..37f55a3 100644
--- a/mof/LMI_Software.reg
+++ b/mof/LMI_Software.reg
@@ -1,22 +1,22 @@
-[LMI_SoftwarePackage]
+[LMI_SoftwareIdentity]
provider: /usr/lib/python2.7/site-packages/openlmi/software/cimom_entry.py
location: pyCmpiProvider
- type: instance method
+ type: instance
namespace: root/cimv2
-[LMI_SoftwareInstalledPackage]
+[LMI_SystemSoftwareCollection]
provider: /usr/lib/python2.7/site-packages/openlmi/software/cimom_entry.py
location: pyCmpiProvider
- type: instance association method
+ type: instance
namespace: root/cimv2
-[LMI_SoftwareFileCheck]
+[LMI_HostedSoftwareCollection]
provider: /usr/lib/python2.7/site-packages/openlmi/software/cimom_entry.py
location: pyCmpiProvider
- type: instance method
+ type: instance association
namespace: root/cimv2
-[LMI_SoftwarePackageChecks]
+[LMI_MemberOfSoftwareCollection]
provider: /usr/lib/python2.7/site-packages/openlmi/software/cimom_entry.py
location: pyCmpiProvider
type: instance association
diff --git a/src/software/cli/software.py b/src/software/cli/software.py
index 5368e5f..14c26df 100755
--- a/src/software/cli/software.py
+++ b/src/software/cli/software.py
@@ -19,20 +19,21 @@
# Authors: Michal Minar <miminar@redhat.com>
#
+"""
+Command line tool for simple software management with OpenLMI CIM software
+providers.
+"""
+
import argparse
from collections import defaultdict
-import os
import pywbem
-import re
import socket
-import subprocess
import sys
-import unittest
-re_nevra = re.compile(r'^(?P<name>.+)-(?P<evra>(?P<epoch>\d+):(?P<ver>[^-]+)'
- r'-(?P<rel>.+)\.(?P<arch>[^.]+))$')
+from openlmi.software import util
+from openlmi.software.core import SystemSoftwareCollection
-cim_error2text = defaultdict(lambda: "OTHER_ERROR", {
+CIM_ERROR2TEXT = defaultdict(lambda: "OTHER_ERROR", {
1 : "FAILED",
2 : "ACCESS_DENIED",
3 : "INVALID_NAMESPACE",
@@ -53,146 +54,74 @@ cim_error2text = defaultdict(lambda: "OTHER_ERROR", {
})
class NotFound(Exception):
+ """
+ Exception raised, when desired package could not be found.
+ """
def __init__(self, package, details=None):
- msg = 'Package "%s" not installed!'%package
+ msg = 'Package "%s" not installed!' % package
if details is not None:
msg += ' : '+details
Exception.__init__(self, msg)
-HOSTNAME = None
def get_host_name():
- global HOSTNAME
- if HOSTNAME is None:
- HOSTNAME = socket.gethostname()
- return HOSTNAME
-
-def make_nevra(name, epoch, ver, rel, arch, with_epoch='NOT_ZERO'):
"""
- @param with_epoch may be one of:
- "NOT_ZERO" - include epoch only if it's not zero
- "ALWAYS" - include epoch always
- "NEVER" - do not include epoch at all
+ @return computer host name
"""
- estr = ''
- if with_epoch.lower() == "always":
- estr = epoch
- elif with_epoch.lower() == "not_zero":
- if epoch != "0":
- estr = epoch
- if len(estr): estr += ":"
- return "%s-%s%s-%s.%s" % (name, estr, ver, rel, arch)
+ if not hasattr(get_host_name, '_hostname'):
+ get_host_name._hostname = socket.gethostname() #pylint: disable=W0212
+ return get_host_name._hostname #pylint: disable=W0212
def make_pkg_path(name, epoch, ver, rel, arch):
+ """
+ @return instance name for LMI_SoftwareIdentity
+ """
return pywbem.CIMInstanceName(
- classname="LMI_SoftwarePackage", namespace="root/cimv2",
+ classname="LMI_SoftwareIdentity", namespace="root/cimv2",
keybindings={
- "Name" : name,
- "SoftwareElementID" : make_nevra(
- name, epoch, ver, rel, arch, "ALWAYS"),
- "SoftwareElementState" : pywbem.Uint16(2),
- "TargetOperatingSystem" : pywbem.Uint16(36),
- "Version" : ver })
-
-def make_inst_pkg_path(*args):
- op = pywbem.CIMInstanceName(classname="LMI_SoftwareInstalledPackage",
- namespace='root/cimv2')
- system_op = pywbem.CIMInstanceName(
- classname="CIM_ComputerSystem", namespace="root/cimv2",
- keybindings={
- "CreationClassName" : "CIM_ComputerSystem",
- "Name" : get_host_name() })
- op["Software"] = make_pkg_path(*args)
- op["System"] = system_op
- return op
-
-def get_instance_name(package, is_nevra=False, installed=True):
- m = re_nevra.match(package)
- if not m and is_nevra:
- raise ValueError('Expected a valid nevra!.')
- if not m and not installed:
- raise ValueError('You must supply a valid nevra!')
- if not m: # given only a name of package
- # try to enumerate installed packages and find a correct one
- inames = conn.EnumerateInstanceNames(
- ClassName='LMI_SoftwareInstalledPackage',
- namespace='root/cimv2')
- for iname in inames:
- if iname['Software']['Name'] == package:
- break
- else:
- raise NotFound(package)
- else: # nevra given
- try:
- args = [m.group(k) for k in 'name', 'epoch', 'ver', 'rel', 'arch']
- if not args[1]: # epoch not given
- args[1] = '0'
- func = (make_pkg_path, make_inst_pkg_path)[1 if installed else 0]
- inst = conn.GetInstance(InstanceName=func(*args), LocalOnly=False)
- iname = inst.path
- except pywbem.CIMError as e:
- if pywbem.CIM_ERR_NOT_FOUND:
- raise NotFound(package, e.args[1])
- raise
- return iname
-
-def install(conn, nevra):
- iname = get_instance_name(nevra, True, False)
- (rval, oparms) = conn.InvokeMethod(MethodName='Install', ObjectName=iname)
- print (rval, oparms)
-
-def update(conn, package, epoch=None, version=None, release=None):
- iname = get_instance_name(package)
- kwargs = dict(MethodName='Update', ObjectName=iname)
- for name, val in ( ('Epoch', epoch), ('Version', version)
- , ('Release', release)):
- if val is None: continue
- kwargs[name] = str(val)
- (rval, oparms) = conn.InvokeMethod(**kwargs)
- print (rval, oparms)
-
-def remove(conn, package):
- iname = get_instance_name(package)
- conn.DeleteInstance(iname)
-
-def verify(conn, package):
- iname = get_instance_name(package)
- (rval, oparms) = conn.InvokeMethod(MethodName='CheckIntegrity',
- ObjectName=iname)
- if rval == 0:
- print "Passed"
- elif rval == 2:
- print "Error"
- else: # Not Passed
- print "Not Passed:"
- for f in oparms["Failed"]:
- inst = conn.GetInstance(InstanceName=f, LocalOnly=False)
- print " %s"%inst["Name"]
- if not inst['FileExists']:
- print " - does not exist"
- else:
- for arg in ( 'Checksum', 'LinkTarget', 'FileGroupID'
- , 'FileUserID', 'FileMode', 'FileSize'
- , 'FileType', 'LastModificationTime'):
- if inst['Expected'+arg] != inst[arg]:
- print " - %s\t: %s != %s"%(arg,
- inst['Expected'+arg], inst[arg])
-
-def list_installed(conn):
- inames = conn.EnumerateInstanceNames(
- ClassName='LMI_SoftwareInstalledPackage',
- namespace='root/cimv2')
- for nevra in sorted((i['Software']['SoftwareElementID'] for i in inames)):
- print nevra
-
-def list_files(conn, package):
- iname = get_instance_name(package)
- # TODO: find out, why passing role='Check' failes
- inames = conn.ReferenceNames(ObjectName=iname['Software'],
- ResultClass='LMI_SoftwarePackageChecks')
- for i in inames:
- print i['Check']['Name']
-
-if __name__ == '__main__':
+ "InstanceID" : util.make_nevra(
+ name, epoch, ver, rel, arch, "ALWAYS")})
+
+def install(_conn, _nevra):
+ """Install package by nevra."""
+ raise NotImplementedError("Installation is not yet supported!")
+
+def update(_conn, _package, _epoch=None, _version=None, _release=None):
+ """Update to particular evr of package."""
+ raise NotImplementedError("Update of package is not yet supported!")
+
+def remove(_conn, _package):
+ """Remove installed package by its name."""
+ raise NotImplementedError("Removal is not yet supported!")
+
+def verify(_conn, _package):
+ """Verity installed package by its name."""
+ raise NotImplementedError("Verification is not yet supported!")
+
+def list_available(conn):
+ """List nevra strings of available packages."""
+ inames = conn.AssociatorNames(
+ ObjectName=SystemSoftwareCollection.get_path(),
+ ResultClass='LMI_SoftwareIdentity',
+ AssocClass="LMI_MemberOfSoftwareCollection",
+ Role="Collection",
+ ResultRole="Member")
+ for nevra in (i['InstanceID'] for i in inames):
+ print nevra[len("LMI:PKG:"):]
+
+def list_installed(_conn):
+ """List nevra strings of installed packages."""
+ raise NotImplementedError(
+ "Listing of installed packages is not yet supported!")
+
+def list_files(_conn, _package):
+ """List files of installed package."""
+ raise NotImplementedError("Listing of package files is not yet supported!")
+
+def parse_args():
+ """
+ Parse command line arguments and handle related errors.
+ @return Namespace object
+ """
parser = argparse.ArgumentParser(prog='software',
description=("CLI tool for testing OpenLMI software providers."
" With this tool you are able to install, update,"
@@ -201,12 +130,12 @@ if __name__ == '__main__':
parser.add_argument('--url',
help="Specify schema, hostname and port of broker in one argument."
" For user and password, please use provided options.")
- parser.add_argument('-p', '--port', type=int, default=5988,
- help="Port of cimom broker.")
+ parser.add_argument('-p', '--port', type=int, default=5989,
+ help="Port of cimom broker. Default is %(default)d.")
parser.add_argument('-h', '--hostname',
default='localhost', help="Hostname of cimom broker.")
parser.add_argument('-s', '--schema', choices=('http', 'https'),
- default='http')
+ default='https', help="Schema part of url (default is %(default)s)")
parser.add_argument('-u', '--user', default='',
help="Under which user to authenticate.")
parser.add_argument('--password', default='',
@@ -247,44 +176,65 @@ if __name__ == '__main__':
parse_verify.set_defaults(command='verify')
parse_list = subpars.add_parser('list',
- help="List installed packages.")
+ help="List various information depending on next argument."
+ " See \"%(prog)s list --help\".")
parse_list.set_defaults(command='list')
- parse_files = subpars.add_parser('list-files',
- help="List files of installed package.")
- parse_files.add_argument('package',
+ list_subpars = parse_list.add_subparsers(help="What should be listed.")
+ parse_list_available = list_subpars.add_parser("available",
+ help="List available packages.")
+ parse_list_available.set_defaults(list_kind='available')
+ parse_list_installed = list_subpars.add_parser("installed",
+ help="List installed packages.")
+ parse_list_installed.set_defaults(list_kind='installed')
+ parse_list_files = list_subpars.add_parser("files",
+ help="List files of installed package.")
+ parse_list_files.set_defaults(list_kind='files')
+ parse_list_files.add_argument('package',
help="Name or nevra of installed package.")
- parse_files.set_defaults(command='list-files')
args = parser.parse_args()
- if args.url is not None:
- url = args.url
- else:
- url = '%s://%s:%d' % (args.schema, args.hostname, args.port)
- HOSTNAME = re.match('^https?://([^:]+)', url).group(1)
+ if args.url is None:
+ args.url = '%s://%s:%d' % (args.schema, args.hostname, args.port)
+
+ return args
+
+def main(args):
+ """
+ Main functionality.
+ @return return code of script
+ """
auth = (args.user, args.password)
if args.debug:
- sys.stderr.write('url:\t%s\n'%url)
- conn = pywbem.WBEMConnection(url, auth)
+ sys.stderr.write('url:\t%s\n'%args.url)
+ conn = pywbem.WBEMConnection(args.url, auth)
- func, attrs = \
- { 'install' : (install, ('nevra',))
- , 'update' : (update, ('package', 'epoch', 'version', 'release'))
- , 'remove' : (remove, ('package',))
- , 'verify' : (verify, ('package',))
- , 'list' : (list_installed, tuple())
- , 'list-files' : (list_files, ('package',))
- }[args.command]
+ if args.command == 'list':
+ func, attrs = \
+ { 'available' : (list_available, tuple())
+ , 'installed' : (list_installed, tuple())
+ , 'files' : (list_files, ('package'))
+ }[args.list_kind]
+ else:
+ func, attrs = \
+ { 'install' : (install, ('nevra',))
+ , 'update' : (update, ('package', 'epoch', 'version', 'release'))
+ , 'remove' : (remove, ('package',))
+ , 'verify' : (verify, ('package',))
+ }[args.command]
try:
func(conn, *(getattr(args, a) for a in attrs))
- sys.exit(0)
- except pywbem.CIMError as e:
+ return 0
+ except pywbem.CIMError as exc:
sys.stderr.write('Failed: %d (%s) - %s\n' % (
- e.args[0], cim_error2text[e.args[0]],
- e.args[1].replace('<br>', '\n')))
- sys.exit(1)
- except NotFound as e:
- sys.stderr.write(str(e) + '\n')
- sys.exit(1)
+ exc.args[0], CIM_ERROR2TEXT[exc.args[0]],
+ exc.args[1].replace('<br>', '\n')))
+ return 1
+ except NotFound as exc:
+ sys.stderr.write(str(exc) + '\n')
+ return 1
+if __name__ == '__main__':
+ ARGS = parse_args()
+ sys.exit(main(ARGS))
diff --git a/src/software/openlmi/software/LMI_SoftwarePackageChecks.py b/src/software/openlmi/software/LMI_HostedSoftwareCollection.py
index 48faabc..ad8b6bf 100644
--- a/src/software/openlmi/software/LMI_SoftwarePackageChecks.py
+++ b/src/software/openlmi/software/LMI_HostedSoftwareCollection.py
@@ -2,26 +2,22 @@
#
# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
#
-# This library is distributed in the hope that it will be useful,
+# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-#
-# Authors: Michal Minar <miminar@redhat.com>
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
-"""Python Provider for LMI_SoftwarePackageChecks
+"""Python Provider for LMI_HostedSoftwareCollection
-Instruments the CIM class LMI_SoftwarePackageChecks
+Instruments the CIM class LMI_HostedSoftwareCollection
"""
@@ -29,18 +25,16 @@ import pywbem
from pywbem.cim_provider2 import CIMProvider2
from openlmi.common import cmpi_logging
-from openlmi.software.core import (SoftwarePackage, SoftwareFileCheck)
-from openlmi.software.yumdb import YumDB
+from openlmi.software.core import ComputerSystem, SystemSoftwareCollection
-class LMI_SoftwarePackageChecks(CIMProvider2):
- """Instrument the CIM class LMI_SoftwarePackageChecks
-
- This association ties a SoftwareElement to a specific Check to validate
- its state or its movement to the next state. Note that
- SoftwareElements in a running state cannot transition to another
- state. Therefore, the value of the Phase property is restricted to 0
- ("In-State") for SoftwareElements in the running state.
+class LMI_HostedSoftwareCollection(CIMProvider2):
+ """Instrument the CIM class LMI_HostedSoftwareCollection
+ HostedSoftwareCollection defines a SystemSpecificCollection in the context of a
+ scoping System. It represents a Collection that has meaning only in
+ the context of a System, a Collection whose elements are restricted by
+ the definition of the System, or both of these types of Collections.
+
"""
def __init__ (self, _env):
@@ -53,35 +47,35 @@ class LMI_SoftwarePackageChecks(CIMProvider2):
Keyword arguments:
env -- Provider Environment (pycimmb.ProviderEnvironment)
- model -- A template of the pywbem.CIMInstance to be returned. The
- key properties are set on this instance to correspond to the
+ model -- A template of the pywbem.CIMInstance to be returned. The
+ key properties are set on this instance to correspond to the
instanceName that was requested. The properties of the model
- are already filtered according to the PropertyList from the
+ are already filtered according to the PropertyList from the
request. Only properties present in the model need to be
- given values. If you prefer, you can set all of the
- values, and the instance will be filtered for you.
+ given values. If you prefer, you can set all of the
+ values, and the instance will be filtered for you.
Possible Errors:
CIM_ERR_ACCESS_DENIED
- CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized
+ CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized
or otherwise incorrect parameters)
- CIM_ERR_NOT_FOUND (the CIM Class does exist, but the requested CIM
+ CIM_ERR_NOT_FOUND (the CIM Class does exist, but the requested CIM
Instance does not exist in the specified namespace)
CIM_ERR_FAILED (some other unspecified error occurred)
"""
- if not "Check" in model:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Missing Check property.")
- if not "Element" in model:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Missing Element property.")
-
- pkg_info, pkg_check, pkg_file = \
- SoftwareFileCheck.object_path2pkg_file(model['Check'])
- model['Check'] = SoftwareFileCheck.filecheck2model(
- pkg_info, pkg_check, pkg_file.path)
- model['Element'] = SoftwarePackage.pkg2model(pkg_info)
+ for key_prop in ("Antecedent", "Dependent"):
+ if not key_prop in model:
+ raise pywbem.CIMError(pywbem.CIM_ERR_INVALID_PARAMETER,
+ "Missing Antecedent key property!")
+ ComputerSystem.check_path_property(env, model, "Antecedent")
+ SystemSoftwareCollection.check_path_property(env, model, "Dependent")
+
+ model.path.update({"Antecedent":None, "Dependent":None})
+
+ model["Antecedent"] = ComputerSystem.get_path()
+ model["Dependent"] = SystemSoftwareCollection.get_path()
+
return model
@cmpi_logging.trace_method
@@ -89,17 +83,17 @@ class LMI_SoftwarePackageChecks(CIMProvider2):
"""Enumerate instances.
The WBEM operations EnumerateInstances and EnumerateInstanceNames
- are both mapped to this method.
+ are both mapped to this method.
This method is a python generator
Keyword arguments:
env -- Provider Environment (pycimmb.ProviderEnvironment)
- model -- A template of the pywbem.CIMInstances to be generated.
- The properties of the model are already filtered according to
- the PropertyList from the request. Only properties present in
- the model need to be given values. If you prefer, you can
- always set all of the values, and the instance will be filtered
- for you.
+ model -- A template of the pywbem.CIMInstances to be generated.
+ The properties of the model are already filtered according to
+ the PropertyList from the request. Only properties present in
+ the model need to be given values. If you prefer, you can
+ always set all of the values, and the instance will be filtered
+ for you.
keys_only -- A boolean. True if only the key properties should be
set on the generated instances.
@@ -107,8 +101,16 @@ class LMI_SoftwarePackageChecks(CIMProvider2):
CIM_ERR_FAILED (some other unspecified error occurred)
"""
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED,
- "Enumeration of instances is not supported.")
+ # Prime model.path with knowledge of the keys, so key values on
+ # the CIMInstanceName (model.path) will automatically be set when
+ # we set property values on the model.
+ model.path.update({'Dependent': None, 'Antecedent': None})
+
+ model["Antecedent"] = ComputerSystem.get_path()
+ model["Dependent"] = SystemSoftwareCollection.get_path()
+
+ yield model
+
@cmpi_logging.trace_method
def set_instance(self, env, instance, modify_existing):
@@ -116,22 +118,22 @@ class LMI_SoftwarePackageChecks(CIMProvider2):
Keyword arguments:
env -- Provider Environment (pycimmb.ProviderEnvironment)
- instance -- The new pywbem.CIMInstance. If modifying an existing
- instance, the properties on this instance have been filtered by
+ instance -- The new pywbem.CIMInstance. If modifying an existing
+ instance, the properties on this instance have been filtered by
the PropertyList from the request.
modify_existing -- True if ModifyInstance, False if CreateInstance
- Return the new instance. The keys must be set on the new instance.
+ Return the new instance. The keys must be set on the new instance.
Possible Errors:
CIM_ERR_ACCESS_DENIED
CIM_ERR_NOT_SUPPORTED
- CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized
+ CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized
or otherwise incorrect parameters)
- CIM_ERR_ALREADY_EXISTS (the CIM Instance already exists -- only
+ CIM_ERR_ALREADY_EXISTS (the CIM Instance already exists -- only
valid if modify_existing is False, indicating that the operation
was CreateInstance)
- CIM_ERR_NOT_FOUND (the CIM Instance does not exist -- only valid
+ CIM_ERR_NOT_FOUND (the CIM Instance does not exist -- only valid
if modify_existing is True, indicating that the operation
was ModifyInstance)
CIM_ERR_FAILED (some other unspecified error occurred)
@@ -145,61 +147,61 @@ class LMI_SoftwarePackageChecks(CIMProvider2):
Keyword arguments:
env -- Provider Environment (pycimmb.ProviderEnvironment)
- instance_name -- A pywbem.CIMInstanceName specifying the instance
+ instance_name -- A pywbem.CIMInstanceName specifying the instance
to delete.
Possible Errors:
CIM_ERR_ACCESS_DENIED
CIM_ERR_NOT_SUPPORTED
CIM_ERR_INVALID_NAMESPACE
- CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized
+ CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized
or otherwise incorrect parameters)
- CIM_ERR_INVALID_CLASS (the CIM Class does not exist in the specified
+ CIM_ERR_INVALID_CLASS (the CIM Class does not exist in the specified
namespace)
- CIM_ERR_NOT_FOUND (the CIM Class does exist, but the requested CIM
+ CIM_ERR_NOT_FOUND (the CIM Class does exist, but the requested CIM
Instance does not exist in the specified namespace)
CIM_ERR_FAILED (some other unspecified error occurred)
- """
+ """
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED)
-
+
@cmpi_logging.trace_method
def references(self, env, object_name, model, result_class_name, role,
result_role, keys_only):
"""Instrument Associations.
- All four association-related operations (Associators, AssociatorNames,
- References, ReferenceNames) are mapped to this method.
+ All four association-related operations (Associators, AssociatorNames,
+ References, ReferenceNames) are mapped to this method.
This method is a python generator
Keyword arguments:
env -- Provider Environment (pycimmb.ProviderEnvironment)
- object_name -- A pywbem.CIMInstanceName that defines the source
+ object_name -- A pywbem.CIMInstanceName that defines the source
CIM Object whose associated Objects are to be returned.
model -- A template pywbem.CIMInstance to serve as a model
of the objects to be returned. Only properties present on this
- model need to be set.
- result_class_name -- If not empty, this string acts as a filter on
- the returned set of Instances by mandating that each returned
- Instances MUST represent an association between object_name
+ model need to be set.
+ result_class_name -- If not empty, this string acts as a filter on
+ the returned set of Instances by mandating that each returned
+ Instances MUST represent an association between object_name
and an Instance of a Class whose name matches this parameter
- or a subclass.
- role -- If not empty, MUST be a valid Property name. It acts as a
- filter on the returned set of Instances by mandating that each
- returned Instance MUST refer to object_name via a Property
+ or a subclass.
+ role -- If not empty, MUST be a valid Property name. It acts as a
+ filter on the returned set of Instances by mandating that each
+ returned Instance MUST refer to object_name via a Property
whose name matches the value of this parameter.
- result_role -- If not empty, MUST be a valid Property name. It acts
- as a filter on the returned set of Instances by mandating that
- each returned Instance MUST represent associations of
- object_name to other Instances, where the other Instances play
- the specified result_role in the association (i.e. the
- name of the Property in the Association Class that refers to
- the Object related to object_name MUST match the value of this
+ result_role -- If not empty, MUST be a valid Property name. It acts
+ as a filter on the returned set of Instances by mandating that
+ each returned Instance MUST represent associations of
+ object_name to other Instances, where the other Instances play
+ the specified result_role in the association (i.e. the
+ name of the Property in the Association Class that refers to
+ the Object related to object_name MUST match the value of this
parameter).
keys_only -- A boolean. True if only the key properties should be
set on the generated instances.
- The following diagram may be helpful in understanding the role,
+ The following diagram may be helpful in understanding the role,
result_role, and result_class_name parameters.
+------------------------+ +-------------------+
| object_name.classname | | result_class_name |
@@ -217,47 +219,20 @@ class LMI_SoftwarePackageChecks(CIMProvider2):
CIM_ERR_ACCESS_DENIED
CIM_ERR_NOT_SUPPORTED
CIM_ERR_INVALID_NAMESPACE
- CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized
+ CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized
or otherwise incorrect parameters)
CIM_ERR_FAILED (some other unspecified error occurred)
"""
- cimhandle = env.get_cimom_handle()
-
- # Prime model.path with knowledge of the keys, so key values on
- # the CIMInstanceName (model.path) will automatically be set when
- # we set property values on the model.
- model.path.update({'Check': None, 'Element': None})
-
- with YumDB.getInstance() as ydb:
- if ( (not role or role.lower() == 'element')
- and cimhandle.is_subclass(object_name.namespace,
- sub=object_name.classname,
- super='LMI_SoftwarePackage')):
- filecheck_model = pywbem.CIMInstanceName(
- classname='LMI_SoftwareFileCheck',
- namespace="root/cimv2",
- host=model.path.host)
- pkg_info = SoftwarePackage.object_path2pkg(object_name,
- kind="installed")
- model['Element'] = SoftwarePackage.pkg2model(pkg_info)
-
- pkg_check = ydb.check_package(pkg_info)
- for file_name in pkg_check:
- model['Check'] = SoftwareFileCheck.filecheck2model(
- pkg_info, pkg_check, file_name,
- model=filecheck_model)
- yield model
-
- if ( (not role or role.lower() == 'check')
- and cimhandle.is_subclass(object_name.namespace,
- sub=object_name.classname,
- super='LMI_SoftwareFileCheck')):
- pkg_info, pkg_check, pkg_file = \
- SoftwareFileCheck.object_path2pkg_file(object_name)
- model['Check'] = SoftwareFileCheck.filecheck2model(
- pkg_info, pkg_check, pkg_file.path)
-
- model['Element'] = SoftwarePackage.pkg2model(pkg_info)
- yield model
+ ch = env.get_cimom_handle()
+ # If you want to get references for free, implemented in terms
+ # of enum_instances, just leave the code below unaltered.
+ if ch.is_subclass(object_name.namespace,
+ sub=object_name.classname,
+ super='CIM_SystemSpecificCollection') or \
+ ch.is_subclass(object_name.namespace,
+ sub=object_name.classname,
+ super='CIM_System'):
+ return self.simple_refs(env, object_name, model,
+ result_class_name, role, result_role, keys_only)
diff --git a/src/software/openlmi/software/LMI_MemberOfSoftwareCollection.py b/src/software/openlmi/software/LMI_MemberOfSoftwareCollection.py
new file mode 100644
index 0000000..ae79bf7
--- /dev/null
+++ b/src/software/openlmi/software/LMI_MemberOfSoftwareCollection.py
@@ -0,0 +1,279 @@
+# Software Management Providers
+#
+# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""Python Provider for LMI_MemberOfSoftwareCollection
+
+Instruments the CIM class LMI_MemberOfSoftwareCollection
+
+"""
+
+import pywbem
+from pywbem.cim_provider2 import CIMProvider2
+
+from openlmi.common import cmpi_logging
+from openlmi.software.core import SystemSoftwareCollection
+from openlmi.software.core import SoftwareIdentity
+from openlmi.software.yumdb import YumDB
+
+class LMI_MemberOfSoftwareCollection(CIMProvider2):
+ """Instrument the CIM class LMI_MemberOfSoftwareCollection
+
+ LMI_MemberOfSoftwareCollection is an aggregation used to establish membership
+ of ManagedElements in a Collection.
+
+ """
+
+ def __init__(self, _env):
+ cmpi_logging.logger.debug('Initializing provider %s from %s' \
+ % (self.__class__.__name__, __file__))
+
+ @cmpi_logging.trace_method
+ def get_instance(self, env, model):
+ """Return an instance.
+
+ Keyword arguments:
+ env -- Provider Environment (pycimmb.ProviderEnvironment)
+ model -- A template of the pywbem.CIMInstance to be returned. The
+ key properties are set on this instance to correspond to the
+ instanceName that was requested. The properties of the model
+ are already filtered according to the PropertyList from the
+ request. Only properties present in the model need to be
+ given values. If you prefer, you can set all of the
+ values, and the instance will be filtered for you.
+
+ Possible Errors:
+ CIM_ERR_ACCESS_DENIED
+ CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized
+ or otherwise incorrect parameters)
+ CIM_ERR_NOT_FOUND (the CIM Class does exist, but the requested CIM
+ Instance does not exist in the specified namespace)
+ CIM_ERR_FAILED (some other unspecified error occurred)
+
+ """
+ SystemSoftwareCollection.check_path_property(env, model, "Collection")
+
+ if not 'Member' in model:
+ raise pywbem.CIMError(pywbem.CIM_ERR_INVALID_PARAMETER,
+ 'Missing "Member" key property!')
+ if not isinstance(model['Member'], pywbem.CIMInstanceName):
+ raise pywbem.CIMError(pywbem.CIM_ERR_INVALID_PARAMETER,
+ "Expected object path for Member!")
+
+ model['Collection'] = SystemSoftwareCollection.get_path()
+ with YumDB.get_instance():
+ pkg_info = SoftwareIdentity.object_path2pkg(
+ model['Member'], 'available')
+ model['Member'] = SoftwareIdentity.pkg2model(pkg_info)
+ return model
+
+ @cmpi_logging.trace_method
+ def enum_instances(self, env, model, keys_only):
+ """Enumerate instances.
+
+ The WBEM operations EnumerateInstances and EnumerateInstanceNames
+ are both mapped to this method.
+ This method is a python generator
+
+ Keyword arguments:
+ env -- Provider Environment (pycimmb.ProviderEnvironment)
+ model -- A template of the pywbem.CIMInstances to be generated.
+ The properties of the model are already filtered according to
+ the PropertyList from the request. Only properties present in
+ the model need to be given values. If you prefer, you can
+ always set all of the values, and the instance will be filtered
+ for you.
+ keys_only -- A boolean. True if only the key properties should be
+ set on the generated instances.
+
+ Possible Errors:
+ CIM_ERR_FAILED (some other unspecified error occurred)
+ """
+ # Prime model.path with knowledge of the keys, so key values on
+ # the CIMInstanceName (model.path) will automatically be set when
+ # we set property values on the model.
+ model.path.update({'Member': None, 'Collection': None})
+
+ model['Collection'] = SystemSoftwareCollection.get_path()
+ member_model = pywbem.CIMInstanceName(
+ classname="LMI_SoftwareIdentity",
+ namespace="root/cimv2")
+ with YumDB.get_instance() as yb:
+ pl = yb.get_package_list('available',
+ allow_duplicates=True,
+ sort=True)
+ for pkg in pl:
+ model['Member'] = SoftwareIdentity.pkg2model(
+ pkg, model=member_model)
+ yield model
+
+ @cmpi_logging.trace_method
+ def set_instance(self, env, instance, modify_existing):
+ """Return a newly created or modified instance.
+
+ Keyword arguments:
+ env -- Provider Environment (pycimmb.ProviderEnvironment)
+ instance -- The new pywbem.CIMInstance. If modifying an existing
+ instance, the properties on this instance have been filtered by
+ the PropertyList from the request.
+ modify_existing -- True if ModifyInstance, False if CreateInstance
+
+ Return the new instance. The keys must be set on the new instance.
+
+ Possible Errors:
+ CIM_ERR_ACCESS_DENIED
+ CIM_ERR_NOT_SUPPORTED
+ CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized
+ or otherwise incorrect parameters)
+ CIM_ERR_ALREADY_EXISTS (the CIM Instance already exists -- only
+ valid if modify_existing is False, indicating that the operation
+ was CreateInstance)
+ CIM_ERR_NOT_FOUND (the CIM Instance does not exist -- only valid
+ if modify_existing is True, indicating that the operation
+ was ModifyInstance)
+ CIM_ERR_FAILED (some other unspecified error occurred)
+
+ """
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED)
+
+ @cmpi_logging.trace_method
+ def delete_instance(self, env, instance_name):
+ """Delete an instance.
+
+ Keyword arguments:
+ env -- Provider Environment (pycimmb.ProviderEnvironment)
+ instance_name -- A pywbem.CIMInstanceName specifying the instance
+ to delete.
+
+ Possible Errors:
+ CIM_ERR_ACCESS_DENIED
+ CIM_ERR_NOT_SUPPORTED
+ CIM_ERR_INVALID_NAMESPACE
+ CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized
+ or otherwise incorrect parameters)
+ CIM_ERR_INVALID_CLASS (the CIM Class does not exist in the specified
+ namespace)
+ CIM_ERR_NOT_FOUND (the CIM Class does exist, but the requested CIM
+ Instance does not exist in the specified namespace)
+ CIM_ERR_FAILED (some other unspecified error occurred)
+
+ """
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED)
+
+ @cmpi_logging.trace_method
+ def references(self, env, object_name, model, result_class_name, role,
+ result_role, keys_only):
+ """Instrument Associations.
+
+ All four association-related operations (Associators, AssociatorNames,
+ References, ReferenceNames) are mapped to this method.
+ This method is a python generator
+
+ Keyword arguments:
+ env -- Provider Environment (pycimmb.ProviderEnvironment)
+ object_name -- A pywbem.CIMInstanceName that defines the source
+ CIM Object whose associated Objects are to be returned.
+ model -- A template pywbem.CIMInstance to serve as a model
+ of the objects to be returned. Only properties present on this
+ model need to be set.
+ result_class_name -- If not empty, this string acts as a filter on
+ the returned set of Instances by mandating that each returned
+ Instances MUST represent an association between object_name
+ and an Instance of a Class whose name matches this parameter
+ or a subclass.
+ role -- If not empty, MUST be a valid Property name. It acts as a
+ filter on the returned set of Instances by mandating that each
+ returned Instance MUST refer to object_name via a Property
+ whose name matches the value of this parameter.
+ result_role -- If not empty, MUST be a valid Property name. It acts
+ as a filter on the returned set of Instances by mandating that
+ each returned Instance MUST represent associations of
+ object_name to other Instances, where the other Instances play
+ the specified result_role in the association (i.e. the
+ name of the Property in the Association Class that refers to
+ the Object related to object_name MUST match the value of this
+ parameter).
+ keys_only -- A boolean. True if only the key properties should be
+ set on the generated instances.
+
+ The following diagram may be helpful in understanding the role,
+ result_role, and result_class_name parameters.
+ +------------------------+ +-------------------+
+ | object_name.classname | | result_class_name |
+ | ~~~~~~~~~~~~~~~~~~~~~ | | ~~~~~~~~~~~~~~~~~ |
+ +------------------------+ +-------------------+
+ | +-----------------------------------+ |
+ | | [Association] model.classname | |
+ | object_name | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
+ +--------------+ object_name.classname REF role | |
+ (CIMInstanceName) | result_class_name REF result_role +------+
+ | |(CIMInstanceName)
+ +-----------------------------------+
+
+ Possible Errors:
+ CIM_ERR_ACCESS_DENIED
+ CIM_ERR_NOT_SUPPORTED
+ CIM_ERR_INVALID_NAMESPACE
+ CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized
+ or otherwise incorrect parameters)
+ CIM_ERR_FAILED (some other unspecified error occurred)
+
+ """
+ cimhandle = env.get_cimom_handle()
+
+ model.path.update({'Collection': None, 'Member': None})
+
+ with YumDB.get_instance() as ydb:
+ if ( (not role or role.lower() == 'collection')
+ and cimhandle.is_subclass(object_name.namespace,
+ sub=object_name.classname,
+ super='LMI_SystemSoftwareCollection')
+ and "InstanceID" in object_name):
+ if object_name["InstanceID"] == \
+ SystemSoftwareCollection.get_path()["InstanceID"]:
+ pkg_model = pywbem.CIMInstanceName(
+ classname='LMI_SoftwareIdentity',
+ namespace="root/cimv2",
+ host=model.path.host)
+ model["Collection"] = SystemSoftwareCollection.get_path()
+ for pkg_info in ydb.get_package_list('available',
+ allow_duplicates=True, sort=True):
+ model["Member"] = SoftwareIdentity.pkg2model(
+ pkg_info, model=pkg_model)
+ yield model
+
+ if ( (not role or role.lower() == 'member')
+ and cimhandle.is_subclass(object_name.namespace,
+ sub=object_name.classname,
+ super='LMI_SoftwareIdentity')):
+ try:
+ pkg_info = SoftwareIdentity.object_path2pkg(
+ object_name, kind="available")
+ model['Member'] = SoftwareIdentity.pkg2model(pkg_info)
+ model["Collection"] = SystemSoftwareCollection.get_path()
+ except pywbem.CIMError as exc:
+ if exc.args[0] == pywbem.CIM_ERR_NOT_FOUND:
+ msg = "Could not find requested package%s."
+ if "InstanceID" in object_name:
+ msg = msg % (' with InstanceID="%s"' % \
+ object_name["InstanceID"])
+ else:
+ msg = msg % ""
+ raise pywbem.CIMError(pywbem.CIM_ERR_FAILED, msg)
+ else:
+ raise
+ yield model
+
diff --git a/src/software/openlmi/software/LMI_SoftwareFileCheck.py b/src/software/openlmi/software/LMI_SoftwareFileCheck.py
deleted file mode 100644
index 58470bb..0000000
--- a/src/software/openlmi/software/LMI_SoftwareFileCheck.py
+++ /dev/null
@@ -1,247 +0,0 @@
-# Software Management Providers
-#
-# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-#
-# Authors: Michal Minar <miminar@redhat.com>
-#
-
-"""Python Provider for LMI_SoftwareFileCheck
-
-Instruments the CIM class LMI_SoftwareFileCheck
-
-"""
-
-import pywbem
-from pywbem.cim_provider2 import CIMProvider2
-
-from openlmi.common import cmpi_logging
-from openlmi.software.core import SoftwareFileCheck
-from openlmi.software.yumdb import YumDB
-
-class LMI_SoftwareFileCheck(CIMProvider2):
- """Instrument the CIM class LMI_SoftwareFileCheck
-
- Identifies a file contained by RPM package. It's located in directory
- identified in FileName. The Invoke methods check for file existence
- and whether its attributes match those given by RPM package.
-
- """
-
- def __init__(self, _env):
- cmpi_logging.logger.debug('Initializing provider %s from %s' \
- % (self.__class__.__name__, __file__))
-
- @cmpi_logging.trace_method
- def get_instance(self, env, model):
- """Return an instance.
-
- Keyword arguments:
- env -- Provider Environment (pycimmb.ProviderEnvironment)
- model -- A template of the pywbem.CIMInstance to be returned. The
- key properties are set on this instance to correspond to the
- instanceName that was requested. The properties of the model
- are already filtered according to the PropertyList from the
- request. Only properties present in the model need to be
- given values. If you prefer, you can set all of the
- values, and the instance will be filtered for you.
-
- Possible Errors:
- CIM_ERR_ACCESS_DENIED
- CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized
- or otherwise incorrect parameters)
- CIM_ERR_NOT_FOUND (the CIM Class does exist, but the requested CIM
- Instance does not exist in the specified namespace)
- CIM_ERR_FAILED (some other unspecified error occurred)
-
- """
- with YumDB.getInstance():
- pkg_info, pkg_check, pkg_file = \
- SoftwareFileCheck.object_path2pkg_file(model.path)
- return SoftwareFileCheck.filecheck2model(
- pkg_info, pkg_check, pkg_file.path, keys_only=False)
-
- @cmpi_logging.trace_method
- def enum_instances(self, env, model, keys_only):
- """Enumerate instances.
-
- The WBEM operations EnumerateInstances and EnumerateInstanceNames
- are both mapped to this method.
- This method is a python generator
-
- Keyword arguments:
- env -- Provider Environment (pycimmb.ProviderEnvironment)
- model -- A template of the pywbem.CIMInstances to be generated.
- The properties of the model are already filtered according to
- the PropertyList from the request. Only properties present in
- the model need to be given values. If you prefer, you can
- always set all of the values, and the instance will be filtered
- for you.
- keys_only -- A boolean. True if only the key properties should be
- set on the generated instances.
-
- Possible Errors:
- CIM_ERR_FAILED (some other unspecified error occurred)
-
- """
- # this won't be supported because of enormous amount of data
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED)
-
- @cmpi_logging.trace_method
- def set_instance(self, env, instance, modify_existing):
- """Return a newly created or modified instance.
-
- Keyword arguments:
- env -- Provider Environment (pycimmb.ProviderEnvironment)
- instance -- The new pywbem.CIMInstance. If modifying an existing
- instance, the properties on this instance have been filtered by
- the PropertyList from the request.
- modify_existing -- True if ModifyInstance, False if CreateInstance
-
- Return the new instance. The keys must be set on the new instance.
-
- Possible Errors:
- CIM_ERR_ACCESS_DENIED
- CIM_ERR_NOT_SUPPORTED
- CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized
- or otherwise incorrect parameters)
- CIM_ERR_ALREADY_EXISTS (the CIM Instance already exists -- only
- valid if modify_existing is False, indicating that the operation
- was CreateInstance)
- CIM_ERR_NOT_FOUND (the CIM Instance does not exist -- only valid
- if modify_existing is True, indicating that the operation
- was ModifyInstance)
- CIM_ERR_FAILED (some other unspecified error occurred)
-
- """
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED)
-
- @cmpi_logging.trace_method
- def delete_instance(self, env, instance_name):
- """Delete an instance.
-
- Keyword arguments:
- env -- Provider Environment (pycimmb.ProviderEnvironment)
- instance_name -- A pywbem.CIMInstanceName specifying the instance
- to delete.
-
- Possible Errors:
- CIM_ERR_ACCESS_DENIED
- CIM_ERR_NOT_SUPPORTED
- CIM_ERR_INVALID_NAMESPACE
- CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized
- or otherwise incorrect parameters)
- CIM_ERR_INVALID_CLASS (the CIM Class does not exist in the specified
- namespace)
- CIM_ERR_NOT_FOUND (the CIM Class does exist, but the requested CIM
- Instance does not exist in the specified namespace)
- CIM_ERR_FAILED (some other unspecified error occurred)
-
- """
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED)
-
- @cmpi_logging.trace_method
- def cim_method_invoke(self, env, object_name):
- """Implements LMI_SoftwareFileCheck.Invoke()
-
- The Invoke method evaluates this Check. The details of the
- evaluation are described by the specific subclasses of CIM_Check.
- When the SoftwareElement being checked is already installed, the
- CIM_InstalledSoftwareElement association identifies the
- CIM_ComputerSystem in whose context the Invoke is executed. If
- this association is not in place, then the InvokeOnSystem method
- should be used - since it identifies the TargetSystem as an input
- parameter of the method. \nThe results of the Invoke method are
- based on the return value. A zero is returned if the condition is
- satisfied. A one is returned if the method is not supported. Any
- other value indicates the condition is not satisfied.
-
- Keyword arguments:
- env -- Provider Environment (pycimmb.ProviderEnvironment)
- object_name -- A pywbem.CIMInstanceName or pywbem.CIMCLassName
- specifying the object on which the method Invoke()
- should be invoked.
-
- Returns a two-tuple containing the return value (type pywbem.Uint32)
- and a list of CIMParameter objects representing the output parameters
-
- Output parameters: none
-
- Possible Errors:
- CIM_ERR_ACCESS_DENIED
- CIM_ERR_INVALID_PARAMETER (including missing, duplicate,
- unrecognized or otherwise incorrect parameters)
- CIM_ERR_NOT_FOUND (the target CIM Class or instance does not
- exist in the specified namespace)
- CIM_ERR_METHOD_NOT_AVAILABLE (the CIM Server is unable to honor
- the invocation request)
- CIM_ERR_FAILED (some other unspecified error occurred)
-
- """
- with YumDB.getInstance():
- _, pkg_check, pkg_file = \
- SoftwareFileCheck.object_path2pkg_file(object_name)
- sfc = SoftwareFileCheck.test_file(
- pkg_check.file_checksum_type, pkg_file)
- out_params = []
- ret = 0 if SoftwareFileCheck.filecheck_passed(sfc) else 2
- return (pywbem.Uint32(ret), out_params)
-
- @cmpi_logging.trace_method
- def cim_method_invokeonsystem(self, env, object_name,
- param_targetsystem=None):
- """Implements LMI_SoftwareFileCheck.InvokeOnSystem()
-
- The InvokeOnSystem method evaluates this Check. The details of the
- evaluation are described by the specific subclasses of CIM_Check.
- The method\'s TargetSystem input parameter specifies the
- ComputerSystem in whose context the method is invoked. \nThe
- results of the InvokeOnSystem method are based on the return
- value. A zero is returned if the condition is satisfied. A one is
- returned if the method is not supported. Any other value indicates
- the condition is not satisfied.
-
- Keyword arguments:
- env -- Provider Environment (pycimmb.ProviderEnvironment)
- object_name -- A pywbem.CIMInstanceName or pywbem.CIMCLassName
- specifying the object on which the method InvokeOnSystem()
- should be invoked.
- param_targetsystem -- The input parameter TargetSystem (
- type REF (pywbem.CIMInstanceName(
- classname='CIM_ComputerSystem', ...))
- Reference to ComputerSystem in whose context the method is to
- be invoked.
-
-
- Returns a two-tuple containing the return value (type pywbem.Uint32)
- and a list of CIMParameter objects representing the output parameters
-
- Output parameters: none
-
- Possible Errors:
- CIM_ERR_ACCESS_DENIED
- CIM_ERR_INVALID_PARAMETER (including missing, duplicate,
- unrecognized or otherwise incorrect parameters)
- CIM_ERR_NOT_FOUND (the target CIM Class or instance does not
- exist in the specified namespace)
- CIM_ERR_METHOD_NOT_AVAILABLE (the CIM Server is unable to honor
- the invocation request)
- CIM_ERR_FAILED (some other unspecified error occurred)
-
- """
- # TODO do something
- raise pywbem.CIMError(pywbem.CIM_ERR_METHOD_NOT_AVAILABLE)
-
diff --git a/src/software/openlmi/software/LMI_SoftwareIdentity.py b/src/software/openlmi/software/LMI_SoftwareIdentity.py
new file mode 100644
index 0000000..b97ca25
--- /dev/null
+++ b/src/software/openlmi/software/LMI_SoftwareIdentity.py
@@ -0,0 +1,190 @@
+# -*- encoding: utf-8 -*-
+# Software Management Providers
+#
+# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""Python Provider for LMI_SoftwareIdentity
+
+Instruments the CIM class LMI_SoftwareIdentity
+
+"""
+
+import pywbem
+from pywbem.cim_provider2 import CIMProvider2
+
+from openlmi.common import cmpi_logging
+from openlmi.software.core import SoftwareIdentity
+from openlmi.software.yumdb import YumDB
+
+class LMI_SoftwareIdentity(CIMProvider2):
+ """Instrument the CIM class LMI_SoftwareIdentity
+
+ SoftwareIdentity provides descriptive information about a software
+ component for asset tracking and/or installation dependency
+ management. When the IsEntity property has the value TRUE, the
+ instance of SoftwareIdentity represents an individually identifiable
+ entity similar to Physical Element. SoftwareIdentity does NOT indicate
+ whether the software is installed, executing, etc. This extra
+ information may be provided through specialized associations to
+ Software Identity. For instance, both InstalledSoftwareIdentity and
+ ElementSoftwareIdentity may be used to indicate that the software
+ identified by this class is installed. SoftwareIdentity is used when
+ managing the software components of a ManagedElement that is the
+ management focus. Since software may be acquired, SoftwareIdentity can
+ be associated with a Product using the ProductSoftwareComponent
+ relationship. The Application Model manages the deployment and
+ installation of software via the classes, SoftwareFeatures and
+ SoftwareElements. SoftwareFeature and SoftwareElement are used when
+ the software component is the management focus. The
+ deployment/installation concepts are related to the asset/identity
+ one. In fact, a SoftwareIdentity may correspond to a Product, or to
+ one or more SoftwareFeatures or SoftwareElements - depending on the
+ granularity of these classes and the deployment model. The
+ correspondence of Software Identity to Product, SoftwareFeature or
+ SoftwareElement is indicated using the ConcreteIdentity association.
+ Note that there may not be sufficient detail or instrumentation to
+ instantiate ConcreteIdentity. And, if the association is instantiated,
+ some duplication of information may result. For example, the Vendor
+ described in the instances of Product and SoftwareIdentity MAY be the
+ same. However, this is not necessarily true, and it is why vendor and
+ similar information are duplicated in this class. Note that
+ ConcreteIdentity can also be used to describe the relationship of the
+ software to any LogicalFiles that result from installing it. As above,
+ there may not be sufficient detail or instrumentation to instantiate
+ this association.
+
+ """
+
+ def __init__ (self, _env):
+ cmpi_logging.logger.debug('Initializing provider %s from %s' \
+ % (self.__class__.__name__, __file__))
+
+ @cmpi_logging.trace_method
+ def get_instance(self, env, model):
+ """Return an instance.
+
+ Keyword arguments:
+ env -- Provider Environment (pycimmb.ProviderEnvironment)
+ model -- A template of the pywbem.CIMInstance to be returned. The
+ key properties are set on this instance to correspond to the
+ instanceName that was requested. The properties of the model
+ are already filtered according to the PropertyList from the
+ request. Only properties present in the model need to be
+ given values. If you prefer, you can set all of the
+ values, and the instance will be filtered for you.
+
+ Possible Errors:
+ CIM_ERR_ACCESS_DENIED
+ CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized
+ or otherwise incorrect parameters)
+ CIM_ERR_NOT_FOUND (the CIM Class does exist, but the requested CIM
+ Instance does not exist in the specified namespace)
+ CIM_ERR_FAILED (some other unspecified error occurred)
+
+ """
+ with YumDB.get_instance():
+ pkg_info = SoftwareIdentity.object_path2pkg(model.path, 'all')
+ return SoftwareIdentity.pkg2model(
+ pkg_info, keys_only=False, model=model)
+
+ @cmpi_logging.trace_method
+ def enum_instances(self, env, model, keys_only):
+ """Enumerate instances.
+
+ The WBEM operations EnumerateInstances and EnumerateInstanceNames
+ are both mapped to this method.
+ This method is a python generator
+
+ Keyword arguments:
+ env -- Provider Environment (pycimmb.ProviderEnvironment)
+ model -- A template of the pywbem.CIMInstances to be generated.
+ The properties of the model are already filtered according to
+ the PropertyList from the request. Only properties present in
+ the model need to be given values. If you prefer, you can
+ always set all of the values, and the instance will be filtered
+ for you.
+ keys_only -- A boolean. True if only the key properties should be
+ set on the generated instances.
+
+ Possible Errors:
+ CIM_ERR_FAILED (some other unspecified error occurred)
+
+ """
+ # Prime model.path with knowledge of the keys, so key values on
+ # the CIMInstanceName (model.path) will automatically be set when
+ # we set property values on the model.
+ model.path.update({'InstanceID': None})
+
+ with YumDB.get_instance() as ydb:
+ pkglist = ydb.get_package_list(
+ 'all', allow_duplicates=True, sort=True)
+ for pkg_info in pkglist:
+ yield SoftwareIdentity.pkg2model(
+ pkg_info, keys_only=keys_only, model=model)
+
+ @cmpi_logging.trace_method
+ def set_instance(self, env, instance, modify_existing):
+ """Return a newly created or modified instance.
+
+ Keyword arguments:
+ env -- Provider Environment (pycimmb.ProviderEnvironment)
+ instance -- The new pywbem.CIMInstance. If modifying an existing
+ instance, the properties on this instance have been filtered by
+ the PropertyList from the request.
+ modify_existing -- True if ModifyInstance, False if CreateInstance
+
+ Return the new instance. The keys must be set on the new instance.
+
+ Possible Errors:
+ CIM_ERR_ACCESS_DENIED
+ CIM_ERR_NOT_SUPPORTED
+ CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized
+ or otherwise incorrect parameters)
+ CIM_ERR_ALREADY_EXISTS (the CIM Instance already exists -- only
+ valid if modify_existing is False, indicating that the operation
+ was CreateInstance)
+ CIM_ERR_NOT_FOUND (the CIM Instance does not exist -- only valid
+ if modify_existing is True, indicating that the operation
+ was ModifyInstance)
+ CIM_ERR_FAILED (some other unspecified error occurred)
+
+ """
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED)
+
+ @cmpi_logging.trace_method
+ def delete_instance(self, env, instance_name):
+ """Delete an instance.
+
+ Keyword arguments:
+ env -- Provider Environment (pycimmb.ProviderEnvironment)
+ instance_name -- A pywbem.CIMInstanceName specifying the instance
+ to delete.
+
+ Possible Errors:
+ CIM_ERR_ACCESS_DENIED
+ CIM_ERR_NOT_SUPPORTED
+ CIM_ERR_INVALID_NAMESPACE
+ CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized
+ or otherwise incorrect parameters)
+ CIM_ERR_INVALID_CLASS (the CIM Class does not exist in the specified
+ namespace)
+ CIM_ERR_NOT_FOUND (the CIM Class does exist, but the requested CIM
+ Instance does not exist in the specified namespace)
+ CIM_ERR_FAILED (some other unspecified error occurred)
+
+ """
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED)
+
diff --git a/src/software/openlmi/software/LMI_SoftwareInstalledPackage.py b/src/software/openlmi/software/LMI_SoftwareInstalledPackage.py
deleted file mode 100644
index 4206a46..0000000
--- a/src/software/openlmi/software/LMI_SoftwareInstalledPackage.py
+++ /dev/null
@@ -1,451 +0,0 @@
-# -*- encoding: utf-8 -*-
-# Software Management Providers
-#
-# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-#
-# Authors: Michal Minar <miminar@redhat.com>
-#
-
-"""Python Provider for LMI_SoftwareInstalledPackage
-
-Instruments the CIM class LMI_SoftwareInstalledPackage
-
-"""
-
-import pywbem
-from pywbem.cim_provider2 import CIMProvider2
-
-from openlmi.common import cmpi_logging
-from openlmi.software.core import (
- ComputerSystem, SoftwareFileCheck,
- SoftwareInstalledPackage, SoftwarePackage)
-from openlmi.software.yumdb import YumDB
-
-class LMI_SoftwareInstalledPackage(CIMProvider2):
- """Instrument the CIM class LMI_SoftwareInstalledPackage
-
- The InstalledSoftwareElement association allows the identification of
- the ComputerSystem on which a particular SoftwareElement is installed.
-
- """
-
- def __init__ (self, _env):
- cmpi_logging.logger.debug('Initializing provider %s from %s' \
- % (self.__class__.__name__, __file__))
-
- @cmpi_logging.trace_method
- def get_instance(self, env, model):
- """Return an instance.
-
- Keyword arguments:
- env -- Provider Environment (pycimmb.ProviderEnvironment)
- model -- A template of the pywbem.CIMInstance to be returned. The
- key properties are set on this instance to correspond to the
- instanceName that was requested. The properties of the model
- are already filtered according to the PropertyList from the
- request. Only properties present in the model need to be
- given values. If you prefer, you can set all of the
- values, and the instance will be filtered for you.
-
- Possible Errors:
- CIM_ERR_ACCESS_DENIED
- CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized
- or otherwise incorrect parameters)
- CIM_ERR_NOT_FOUND (the CIM Class does exist, but the requested CIM
- Instance does not exist in the specified namespace)
- CIM_ERR_FAILED (some other unspecified error occurred)
-
- """
- if not "Software" in model:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Missing Software property.")
- if not "System" in model:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Missing System property.")
- ComputerSystem.check_path_property(env, model, 'System')
- model['System'] = ComputerSystem.get_path()
- with YumDB.getInstance():
- pkg_info = SoftwarePackage.object_path2pkg(model['Software'],
- kind="installed")
- model['Software'] = SoftwarePackage.pkg2model(
- pkg_info, keys_only=True)
- return model
-
- @cmpi_logging.trace_method
- def enum_instances(self, env, model, keys_only):
- """Enumerate instances.
-
- The WBEM operations EnumerateInstances and EnumerateInstanceNames
- are both mapped to this method.
- This method is a python generator
-
- Keyword arguments:
- env -- Provider Environment (pycimmb.ProviderEnvironment)
- model -- A template of the pywbem.CIMInstances to be generated.
- The properties of the model are already filtered according to
- the PropertyList from the request. Only properties present in
- the model need to be given values. If you prefer, you can
- always set all of the values, and the instance will be filtered
- for you.
- keys_only -- A boolean. True if only the key properties should be
- set on the generated instances.
-
- Possible Errors:
- CIM_ERR_FAILED (some other unspecified error occurred)
-
- """
- # Prime model.path with knowledge of the keys, so key values on
- # the CIMInstanceName (model.path) will automatically be set when
- # we set property values on the model.
- model.path.update({'System': None, 'Software': None})
-
- yum_package_path = pywbem.CIMInstanceName("LMI_SoftwarePackage",
- namespace=model.path.namespace,
- host=model.path.host)
- model['System'] = ComputerSystem.get_path()
- with YumDB.getInstance() as ydb:
- pkglist = ydb.get_package_list('installed')
- for pkg in pkglist:
- iname = SoftwarePackage.pkg2model(pkg, model=yum_package_path)
- model['Software'] = iname
- yield model
-
- @cmpi_logging.trace_method
- def set_instance(self, env, instance, modify_existing):
- """Return a newly created or modified instance.
-
- Keyword arguments:
- env -- Provider Environment (pycimmb.ProviderEnvironment)
- instance -- The new pywbem.CIMInstance. If modifying an existing
- instance, the properties on this instance have been filtered by
- the PropertyList from the request.
- modify_existing -- True if ModifyInstance, False if CreateInstance
-
- Return the new instance. The keys must be set on the new instance.
-
- Possible Errors:
- CIM_ERR_ACCESS_DENIED
- CIM_ERR_NOT_SUPPORTED
- CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized
- or otherwise incorrect parameters)
- CIM_ERR_ALREADY_EXISTS (the CIM Instance already exists -- only
- valid if modify_existing is False, indicating that the operation
- was CreateInstance)
- CIM_ERR_NOT_FOUND (the CIM Instance does not exist -- only valid
- if modify_existing is True, indicating that the operation
- was ModifyInstance)
- CIM_ERR_FAILED (some other unspecified error occurred)
-
- """
- # parse and check arguments
- if modify_existing is True:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED,
- "MofifyInstance is not supported")
-
- if not "Software" in instance:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Missing Software property.")
- if not "System" in instance:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Missing System property.")
- ComputerSystem.check_path_property(env, instance, 'System')
-
- with YumDB.getInstance() as ydb:
- pkg_info = SoftwarePackage.object_path2pkg_search(
- instance['Software'])
-
- if pkg_info.installed:
- raise pywbem.CIMError(pywbem.CIM_ERR_ALREADY_EXISTS,
- "Package %s is already installed." % pkg_info)
-
- cmpi_logging.logger.info('installing package %s' % pkg_info)
- installed_pkg = ydb.install_package(pkg_info)
- cmpi_logging.logger.info('package %s installed' % pkg_info)
-
- instance["Software"] = SoftwarePackage.pkg2model(installed_pkg)
- return instance
-
- @cmpi_logging.trace_method
- def delete_instance(self, env, instance_name):
- """Delete an instance.
-
- Keyword arguments:
- env -- Provider Environment (pycimmb.ProviderEnvironment)
- instance_name -- A pywbem.CIMInstanceName specifying the instance
- to delete.
-
- Possible Errors:
- CIM_ERR_ACCESS_DENIED
- CIM_ERR_NOT_SUPPORTED
- CIM_ERR_INVALID_NAMESPACE
- CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized
- or otherwise incorrect parameters)
- CIM_ERR_INVALID_CLASS (the CIM Class does not exist in the specified
- namespace)
- CIM_ERR_NOT_FOUND (the CIM Class does exist, but the requested CIM
- Instance does not exist in the specified namespace)
- CIM_ERR_FAILED (some other unspecified error occurred)
-
- """
- if not "Software" in instance_name:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Missing Software property.")
- if not "System" in instance_name:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Missing System property.")
- ComputerSystem.check_path_property(env, instance_name, 'System')
-
- with YumDB.getInstance() as ydb:
- pkg_info = SoftwarePackage.object_path2pkg(
- instance_name["Software"], kind="installed")
- cmpi_logging.logger.info('removing package "%s"' % pkg_info)
- ydb.remove_package(pkg_info)
- cmpi_logging.logger.info('package "%s" removed' % pkg_info)
-
- @cmpi_logging.trace_method
- def references(self, env, object_name, model, result_class_name, role,
- result_role, keys_only):
- """Instrument Associations.
-
- All four association-related operations (Associators, AssociatorNames,
- References, ReferenceNames) are mapped to this method.
- This method is a python generator
-
- Keyword arguments:
- env -- Provider Environment (pycimmb.ProviderEnvironment)
- object_name -- A pywbem.CIMInstanceName that defines the source
- CIM Object whose associated Objects are to be returned.
- model -- A template pywbem.CIMInstance to serve as a model
- of the objects to be returned. Only properties present on this
- model need to be set.
- result_class_name -- If not empty, this string acts as a filter on
- the returned set of Instances by mandating that each returned
- Instances MUST represent an association between object_name
- and an Instance of a Class whose name matches this parameter
- or a subclass.
- role -- If not empty, MUST be a valid Property name. It acts as a
- filter on the returned set of Instances by mandating that each
- returned Instance MUST refer to object_name via a Property
- whose name matches the value of this parameter.
- result_role -- If not empty, MUST be a valid Property name. It acts
- as a filter on the returned set of Instances by mandating that
- each returned Instance MUST represent associations of
- object_name to other Instances, where the other Instances play
- the specified result_role in the association (i.e. the
- name of the Property in the Association Class that refers to
- the Object related to object_name MUST match the value of this
- parameter).
- keys_only -- A boolean. True if only the key properties should be
- set on the generated instances.
-
- The following diagram may be helpful in understanding the role,
- result_role, and result_class_name parameters.
- +------------------------+ +-------------------+
- | object_name.classname | | result_class_name |
- | ~~~~~~~~~~~~~~~~~~~~~ | | ~~~~~~~~~~~~~~~~~ |
- +------------------------+ +-------------------+
- | +-----------------------------------+ |
- | | [Association] model.classname | |
- | object_name | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | |
- +--------------+ object_name.classname REF role | |
- (CIMInstanceName) | result_class_name REF result_role +------+
- | |(CIMInstanceName)
- +-----------------------------------+
-
- Possible Errors:
- CIM_ERR_ACCESS_DENIED
- CIM_ERR_NOT_SUPPORTED
- CIM_ERR_INVALID_NAMESPACE
- CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized
- or otherwise incorrect parameters)
- CIM_ERR_FAILED (some other unspecified error occurred)
-
- """
- cimhandle = env.get_cimom_handle()
-
- # If you want to get references for free, implemented in terms
- # of enum_instances, just leave the code below unaltered.
- if cimhandle.is_subclass(object_name.namespace,
- sub=object_name.classname,
- super='CIM_ComputerSystem') or \
- cimhandle.is_subclass(object_name.namespace,
- sub=object_name.classname,
- super='LMI_SoftwarePackage'):
- return self.simple_refs(env, object_name, model,
- result_class_name, role, result_role, keys_only)
-
- @cmpi_logging.trace_method
- def cim_method_checkintegrity(self, env, object_name):
- """Implements LMI_SoftwarePackage.CheckIntegrity()
-
- Verify existence and attributes of files installed from RPM
- package. If all files installed exist and their attributes
- matches, method returns "Pass". "Not passed" is returned when
- arbitrary file differs in its attributes. And "Error" if
- verification could not be done.
-
- Keyword arguments:
- env -- Provider Environment (pycimmb.ProviderEnvironment)
- object_name -- A pywbem.CIMInstanceName or pywbem.CIMCLassName
- specifying the object on which the method CheckIntegrity()
- should be invoked.
-
- Returns a two-tuple containing the return value (
- type pywbem.Uint32 SoftwareInstalledPackage.Values.CheckIntegrity)
- and a list of CIMParameter objects representing the output parameters
-
- Output parameters:
- Failed -- (type REF (pywbem.CIMInstanceName(
- classname='LMI_SoftwareFileCheck', ...))
- Array of references to LMI_SoftwareFileCheck, that did not pass
- verification.
-
- Possible Errors:
- CIM_ERR_ACCESS_DENIED
- CIM_ERR_INVALID_PARAMETER (including missing, duplicate,
- unrecognized or otherwise incorrect parameters)
- CIM_ERR_NOT_FOUND (the target CIM Class or instance does not
- exist in the specified namespace)
- CIM_ERR_METHOD_NOT_AVAILABLE (the CIM Server is unable to honor
- the invocation request)
- CIM_ERR_FAILED (some other unspecified error occurred)
-
- """
- if not "Software" in object_name:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Missing Software property.")
- if not "System" in object_name:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Missing System property.")
-
- failed = []
- with YumDB.getInstance() as ydb:
- pkg_info = SoftwarePackage.object_path2pkg(object_name['Software'],
- kind="installed")
- pkg_check = ydb.check_package(pkg_info)
- for pkg_file in pkg_check.files.values():
- file_check = SoftwareFileCheck.test_file(
- pkg_check.file_checksum_type, pkg_file)
- if SoftwareFileCheck.filecheck_passed(file_check):
- continue
- failed.append(SoftwareFileCheck.filecheck2model(
- pkg_info, pkg_check, pkg_file.path,
- keys_only=True, file_check=file_check))
- out_params = [ pywbem.CIMParameter('Failed', type='reference',
- value=failed) ]
- return ( getattr(SoftwareInstalledPackage.Values.CheckIntegrity,
- 'Pass' if len(failed) == 0 else 'Not_passed')
- , out_params )
-
- @cmpi_logging.trace_method
- def cim_method_update(self, env, object_name,
- param_epoch=None,
- param_release=None,
- param_version=None):
- """Implements LMI_SoftwareInstalledPackage.Update()
-
- Updates the package to latest version. When any of "version" or
- "release" argument is given, install only matching available
- package. Otherwise try to update to newest package available.
-
- Keyword arguments:
- env -- Provider Environment (pycimmb.ProviderEnvironment)
- object_name -- A pywbem.CIMInstanceName or pywbem.CIMCLassName
- specifying the object on which the method Update()
- should be invoked.
- param_release -- The input parameter release (type unicode)
- Specify particular release of package to update to. Update to
- newest, when empty
- param_version -- The input parameter version (type unicode)
- Specify particular version of package to update to. Update to
- newest, when empty
-
- Returns a two-tuple containing the return value (
- type pywbem.Uint16 SoftwareInstalledPackage.Values.Update)
- and a list of CIMParameter objects representing the output parameters
-
- Output parameters:
- Installed -- (type REF (pywbem.CIMInstanceName(
- classname='LMI_SoftwareInstalledPackage', ...))
- The reference to newly installed package, if installation was
- successful. Otherwise reference to current package.
-
- Possible Errors:
- CIM_ERR_ACCESS_DENIED
- CIM_ERR_INVALID_PARAMETER (including missing, duplicate,
- unrecognized or otherwise incorrect parameters)
- CIM_ERR_NOT_FOUND (the target CIM Class or instance does not
- exist in the specified namespace)
- CIM_ERR_METHOD_NOT_AVAILABLE (the CIM Server is unable to honor
- the invocation request)
- CIM_ERR_FAILED (some other unspecified error occurred)
-
- """
- if not "Software" in object_name:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Missing Software property.")
- if not "System" in object_name:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Missing System property.")
- ComputerSystem.check_path_property(env, object_name, 'System')
-
- with YumDB.getInstance() as ydb:
- orig = SoftwarePackage.object_path2pkg(object_name['Software'],
- kind='installed')
-
- # NOTE: that we need to obtain all available and installed packages
- # because they are disjunctive and we want to select package
- # with highest version. Let the yum do the sorting of packages.
- pkglist = ydb.filter_packages('all',
- allow_duplicates=True, sort=True,
- name=orig.name,
- epoch=param_epoch,
- version=param_version,
- release=param_release,
- arch=orig.arch)
-
- if len(pkglist) < 1:
- cmpi_logging.logger.error(
- "desired package matching evr not found")
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "package matching desired evr not found among"
- " available packages")
- if len(pkglist) > 1:
- cmpi_logging.logger.info("multiple packages matching"
- " evr - selecting newest")
- cmpi_logging.logger.debug("matching packages (from oldest):"
- " [%s]" % ", ".join([str(p) for p in pkglist]))
- desired_pkg = pkglist[-1]
-
- out_params = [pywbem.CIMParameter('Installed', type='reference')]
- if desired_pkg.installed:
- out_params[0].value = SoftwarePackage.pkg2model(desired_pkg)
- cmpi_logging.logger.info('already up to date')
- return ( SoftwareInstalledPackage.Values.Update.Already_newest
- , out_params)
-
- cmpi_logging.logger.info(
- 'trying to update package \"%s\" to "%s"' % (
- orig, desired_pkg))
- installed_pkg = ydb.update_to_package(desired_pkg)
- cmpi_logging.logger.info('update successful')
-
- out_params[0].value = SoftwarePackage.pkg2model(installed_pkg)
-
- return ( SoftwareInstalledPackage.Values.Update.Successful_installation
- , out_params)
-
diff --git a/src/software/openlmi/software/LMI_SoftwarePackage.py b/src/software/openlmi/software/LMI_SoftwarePackage.py
deleted file mode 100644
index 17483e8..0000000
--- a/src/software/openlmi/software/LMI_SoftwarePackage.py
+++ /dev/null
@@ -1,258 +0,0 @@
-# -*- encoding: utf-8 -*-
-# Software Management Providers
-#
-# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-#
-# Authors: Michal Minar <miminar@redhat.com>
-#
-
-"""Python Provider for LMI_SoftwarePackage
-
-Instruments the CIM class LMI_SoftwarePackage
-
-"""
-
-import pywbem
-from pywbem.cim_provider2 import CIMProvider2
-
-from openlmi.common import cmpi_logging
-from openlmi.software.core import SoftwarePackage
-from openlmi.software.yumdb import YumDB
-
-class LMI_SoftwarePackage(CIMProvider2):
- """Instrument the CIM class LMI_SoftwarePackage
-
- RPM package installed on particular computer system with YUM (The
- Yellowdog Updated, Modified) package manager.
-
- """
-
- def __init__(self, _env):
- cmpi_logging.logger.debug('Initializing provider %s from %s' \
- % (self.__class__.__name__, __file__))
-
- @cmpi_logging.trace_method
- def get_instance(self, env, model):
- """Return an instance.
-
- Keyword arguments:
- env -- Provider Environment (pycimmb.ProviderEnvironment)
- model -- A template of the pywbem.CIMInstance to be returned. The
- key properties are set on this instance to correspond to the
- instanceName that was requested. The properties of the model
- are already filtered according to the PropertyList from the
- request. Only properties present in the model need to be
- given values. If you prefer, you can set all of the
- values, and the instance will be filtered for you.
-
- Possible Errors:
- CIM_ERR_ACCESS_DENIED
- CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized
- or otherwise incorrect parameters)
- CIM_ERR_NOT_FOUND (the CIM Class does exist, but the requested CIM
- Instance does not exist in the specified namespace)
- CIM_ERR_FAILED (some other unspecified error occurred)
-
- """
- with YumDB.getInstance():
- pkg_info = SoftwarePackage.object_path2pkg(model.path, 'all')
- return SoftwarePackage.pkg2model(
- pkg_info, keys_only=False, model=model)
-
- @cmpi_logging.trace_method
- def enum_instances(self, env, model, keys_only):
- """Enumerate instances.
-
- The WBEM operations EnumerateInstances and EnumerateInstanceNames
- are both mapped to this method.
- This method is a python generator
-
- Keyword arguments:
- env -- Provider Environment (pycimmb.ProviderEnvironment)
- model -- A template of the pywbem.CIMInstances to be generated.
- The properties of the model are already filtered according to
- the PropertyList from the request. Only properties present in
- the model need to be given values. If you prefer, you can
- always set all of the values, and the instance will be filtered
- for you.
- keys_only -- A boolean. True if only the key properties should be
- set on the generated instances.
-
- Possible Errors:
- CIM_ERR_FAILED (some other unspecified error occurred)
-
- """
- # Prime model.path with knowledge of the keys, so key values on
- # the CIMInstanceName (model.path) will automatically be set when
- # we set property values on the model.
- model.path.update({'TargetOperatingSystem': None, 'Version': None,
- 'SoftwareElementState': None, 'Name': None,
- 'SoftwareElementID': None})
-
- with YumDB.getInstance() as ydb:
- # get all packages
- pkglist = ydb.get_package_list('all',
- allow_duplicates=True, sort=True)
- for pkg in pkglist:
- yield SoftwarePackage.pkg2model(pkg,
- keys_only=keys_only, model=model)
-
- @cmpi_logging.trace_method
- def set_instance(self, env, instance, modify_existing):
- """Return a newly created or modified instance.
-
- Keyword arguments:
- env -- Provider Environment (pycimmb.ProviderEnvironment)
- instance -- The new pywbem.CIMInstance. If modifying an existing
- instance, the properties on this instance have been filtered by
- the PropertyList from the request.
- modify_existing -- True if ModifyInstance, False if CreateInstance
-
- Return the new instance. The keys must be set on the new instance.
-
- Possible Errors:
- CIM_ERR_ACCESS_DENIED
- CIM_ERR_NOT_SUPPORTED
- CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized
- or otherwise incorrect parameters)
- CIM_ERR_ALREADY_EXISTS (the CIM Instance already exists -- only
- valid if modify_existing is False, indicating that the operation
- was CreateInstance)
- CIM_ERR_NOT_FOUND (the CIM Instance does not exist -- only valid
- if modify_existing is True, indicating that the operation
- was ModifyInstance)
- CIM_ERR_FAILED (some other unspecified error occurred)
-
- """
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED)
-
- @cmpi_logging.trace_method
- def delete_instance(self, env, instance_name):
- """Delete an instance.
-
- Keyword arguments:
- env -- Provider Environment (pycimmb.ProviderEnvironment)
- instance_name -- A pywbem.CIMInstanceName specifying the instance
- to delete.
-
- Possible Errors:
- CIM_ERR_ACCESS_DENIED
- CIM_ERR_NOT_SUPPORTED
- CIM_ERR_INVALID_NAMESPACE
- CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized
- or otherwise incorrect parameters)
- CIM_ERR_INVALID_CLASS (the CIM Class does not exist in the specified
- namespace)
- CIM_ERR_NOT_FOUND (the CIM Class does exist, but the requested CIM
- Instance does not exist in the specified namespace)
- CIM_ERR_FAILED (some other unspecified error occurred)
-
- """
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED)
-
- @cmpi_logging.trace_method
- def cim_method_install(self, env, object_name):
- """Implements LMI_SoftwarePackage.Install()
-
- Will install available package.
-
- Keyword arguments:
- env -- Provider Environment (pycimmb.ProviderEnvironment)
- object_name -- A pywbem.CIMInstanceName or pywbem.CIMCLassName
- specifying the object on which the method Update()
- should be invoked.
-
- Returns a two-tuple containing the return value (
- type pywbem.Uint32 SoftwarePackage.Values.Install)
- and a list of CIMParameter objects representing the output parameters
-
- Output parameters:
- Installed -- (type REF (pywbem.CIMInstanceName(
- classname='LMI_SoftwareInstalledPackage', ...))
- The reference to newly installed package, if installation was
- successful. Null otherwise.
-
- Possible Errors:
- CIM_ERR_ACCESS_DENIED
- CIM_ERR_INVALID_PARAMETER (including missing, duplicate,
- unrecognized or otherwise incorrect parameters)
- CIM_ERR_NOT_FOUND (the target CIM Class or instance does not
- exist in the specified namespace)
- CIM_ERR_METHOD_NOT_AVAILABLE (the CIM Server is unable to honor
- the invocation request)
- CIM_ERR_FAILED (some other unspecified error occurred)
-
- """
- with YumDB.getInstance() as ydb:
- # get available packages
- pkg_info = SoftwarePackage.object_path2pkg_search(object_name)
- out_params = [ pywbem.CIMParameter('Installed', type='reference') ]
- if pkg_info.installed:
- out_params[0].value = SoftwarePackage.pkg2model(
- pkg_info, keys_only=True)
- return ( SoftwarePackage.Values.Install.Already_installed
- , out_params)
-
- cmpi_logging.logger.info('installing package %s' % pkg_info)
- installed_pkg = ydb.install_package(pkg_info)
- cmpi_logging.logger.info('package %s installed' % pkg_info)
-
- out_params[0].value = SoftwarePackage.pkg2model(
- installed_pkg, keys_only=True)
- return ( SoftwarePackage.Values.Install.Successful_installation
- , out_params)
-
- @cmpi_logging.trace_method
- def cim_method_remove(self, env, object_name):
- """Implements LMI_SoftwarePackage.Remove()
-
- Will uninstall installed package.
-
- Keyword arguments:
- env -- Provider Environment (pycimmb.ProviderEnvironment)
- object_name -- A pywbem.CIMInstanceName or pywbem.CIMCLassName
- specifying the object on which the method Remove()
- should be invoked.
-
- Returns a two-tuple containing the return value (
- type pywbem.Uint32 SoftwarePackage.Values.Remove)
- and a list of CIMParameter objects representing the output parameters
-
- Output parameters: none
-
- Possible Errors:
- CIM_ERR_ACCESS_DENIED
- CIM_ERR_INVALID_PARAMETER (including missing, duplicate,
- unrecognized or otherwise incorrect parameters)
- CIM_ERR_NOT_FOUND (the target CIM Class or instance does not
- exist in the specified namespace)
- CIM_ERR_METHOD_NOT_AVAILABLE (the CIM Server is unable to honor
- the invocation request)
- CIM_ERR_FAILED (some other unspecified error occurred)
-
- """
- with YumDB.getInstance() as ydb:
- pkg_info = SoftwarePackage.object_path2pkg(object_name, 'all')
- if pkg_info.installed:
- cmpi_logging.logger.info('removing package %s' % pkg_info)
- ydb.remove_package(pkg_info)
- cmpi_logging.logger.info('package %s removed' % pkg_info)
- rval = SoftwarePackage.Values.Remove.Successful_removal
- else:
- rval = SoftwarePackage.Values.Remove.Not_installed
- return (rval, [])
-
diff --git a/src/software/openlmi/software/LMI_SystemSoftwareCollection.py b/src/software/openlmi/software/LMI_SystemSoftwareCollection.py
new file mode 100644
index 0000000..ec2382b
--- /dev/null
+++ b/src/software/openlmi/software/LMI_SystemSoftwareCollection.py
@@ -0,0 +1,174 @@
+# Software Management Providers
+#
+# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""Python Provider for LMI_SystemSoftwareCollection
+
+Instruments the CIM class LMI_SystemSoftwareCollection
+"""
+
+import pywbem
+from pywbem.cim_provider2 import CIMProvider2
+
+from openlmi.common import cmpi_logging
+from openlmi.software.core import SystemSoftwareCollection
+
+class LMI_SystemSoftwareCollection(CIMProvider2):
+ """Instrument the CIM class LMI_SystemSoftwareCollection
+
+ SystemSoftwareCollection represents the general concept of a collection
+ that is scoped (or contained) by a System. It represents a Collection
+ that has meaning only in the context of a System, a Collection whose
+ elements are restricted by the definition of the System, or both of
+ these types of Collections. This meaning is explicitly described by
+ the (required) association, HostedCollection. An example of a
+ SystemSoftwareCollection is a Fibre Channel zone that collects network
+ ports, port groupings, and aliases (as required by a customer) in the
+ context of an AdminDomain. The Collection is not a part of the domain,
+ but merely an arbitrary grouping of the devices and other Collections
+ in the domain. In other words, the context of the Collection is
+ restricted to the domain, and its members are also limited by the
+ domain.
+
+ """
+
+ def __init__ (self, _env):
+ cmpi_logging.logger.debug('Initializing provider %s from %s' \
+ % (self.__class__.__name__, __file__))
+
+ @cmpi_logging.trace_method
+ def get_instance(self, env, model):
+ """Return an instance.
+
+ Keyword arguments:
+ env -- Provider Environment (pycimmb.ProviderEnvironment)
+ model -- A template of the pywbem.CIMInstance to be returned. The
+ key properties are set on this instance to correspond to the
+ instanceName that was requested. The properties of the model
+ are already filtered according to the PropertyList from the
+ request. Only properties present in the model need to be
+ given values. If you prefer, you can set all of the
+ values, and the instance will be filtered for you.
+
+ Possible Errors:
+ CIM_ERR_ACCESS_DENIED
+ CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized
+ or otherwise incorrect parameters)
+ CIM_ERR_NOT_FOUND (the CIM Class does exist, but the requested CIM
+ Instance does not exist in the specified namespace)
+ CIM_ERR_FAILED (some other unspecified error occurred)
+
+ """
+ if not 'InstanceID' in model:
+ raise pywbem.CIMError(pywbem.CIM_ERR_INVALID_PARAMETER,
+ "Missing InstanceID key property")
+ if model['InstanceID'] != \
+ SystemSoftwareCollection.get_path()['InstanceID']:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "No such instance.")
+
+ model['Caption'] = "System RPM Package Collection"
+ #model['Description'] = '' # TODO
+ #model['ElementName'] = '' # TODO
+ return model
+
+ @cmpi_logging.trace_method
+ def enum_instances(self, env, model, keys_only):
+ """Enumerate instances.
+
+ The WBEM operations EnumerateInstances and EnumerateInstanceNames
+ are both mapped to this method.
+ This method is a python generator
+
+ Keyword arguments:
+ env -- Provider Environment (pycimmb.ProviderEnvironment)
+ model -- A template of the pywbem.CIMInstances to be generated.
+ The properties of the model are already filtered according to
+ the PropertyList from the request. Only properties present in
+ the model need to be given values. If you prefer, you can
+ always set all of the values, and the instance will be filtered
+ for you.
+ keys_only -- A boolean. True if only the key properties should be
+ set on the generated instances.
+
+ Possible Errors:
+ CIM_ERR_FAILED (some other unspecified error occurred)
+
+ """
+ # Prime model.path with knowledge of the keys, so key values on
+ # the CIMInstanceName (model.path) will automatically be set when
+ # we set property values on the model.
+ model.path.update({'InstanceID': None})
+
+ model['InstanceID'] = SystemSoftwareCollection.get_path()["InstanceID"]
+ if keys_only is False:
+ yield self.get_instance(env, model)
+ else:
+ yield model
+
+ @cmpi_logging.trace_method
+ def set_instance(self, env, instance, modify_existing):
+ """Return a newly created or modified instance.
+
+ Keyword arguments:
+ env -- Provider Environment (pycimmb.ProviderEnvironment)
+ instance -- The new pywbem.CIMInstance. If modifying an existing
+ instance, the properties on this instance have been filtered by
+ the PropertyList from the request.
+ modify_existing -- True if ModifyInstance, False if CreateInstance
+
+ Return the new instance. The keys must be set on the new instance.
+
+ Possible Errors:
+ CIM_ERR_ACCESS_DENIED
+ CIM_ERR_NOT_SUPPORTED
+ CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized
+ or otherwise incorrect parameters)
+ CIM_ERR_ALREADY_EXISTS (the CIM Instance already exists -- only
+ valid if modify_existing is False, indicating that the operation
+ was CreateInstance)
+ CIM_ERR_NOT_FOUND (the CIM Instance does not exist -- only valid
+ if modify_existing is True, indicating that the operation
+ was ModifyInstance)
+ CIM_ERR_FAILED (some other unspecified error occurred)
+
+ """
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED)
+
+ @cmpi_logging.trace_method
+ def delete_instance(self, env, instance_name):
+ """Delete an instance.
+
+ Keyword arguments:
+ env -- Provider Environment (pycimmb.ProviderEnvironment)
+ instance_name -- A pywbem.CIMInstanceName specifying the instance
+ to delete.
+
+ Possible Errors:
+ CIM_ERR_ACCESS_DENIED
+ CIM_ERR_NOT_SUPPORTED
+ CIM_ERR_INVALID_NAMESPACE
+ CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized
+ or otherwise incorrect parameters)
+ CIM_ERR_INVALID_CLASS (the CIM Class does not exist in the specified
+ namespace)
+ CIM_ERR_NOT_FOUND (the CIM Class does exist, but the requested CIM
+ Instance does not exist in the specified namespace)
+ CIM_ERR_FAILED (some other unspecified error occurred)
+
+ """
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED)
+
diff --git a/src/software/openlmi/software/__init__.py b/src/software/openlmi/software/__init__.py
index 2ebe827..55b7d1b 100644
--- a/src/software/openlmi/software/__init__.py
+++ b/src/software/openlmi/software/__init__.py
@@ -18,3 +18,8 @@
#
# Authors: Michal Minar <miminar@redhat.com>
#
+
+"""
+CIM providers for software management.
+Part of OpenLMI project.
+"""
diff --git a/src/software/openlmi/software/cimom_entry.py b/src/software/openlmi/software/cimom_entry.py
index 9e6e10c..156f63a 100644
--- a/src/software/openlmi/software/cimom_entry.py
+++ b/src/software/openlmi/software/cimom_entry.py
@@ -25,28 +25,34 @@ Entry module for OpenLMI Software proviers.
"""
from openlmi.common import cmpi_logging
-from openlmi.software.LMI_SoftwarePackage import LMI_SoftwarePackage
-from openlmi.software.LMI_SoftwareInstalledPackage import \
- LMI_SoftwareInstalledPackage
-from openlmi.software.LMI_SoftwareFileCheck import LMI_SoftwareFileCheck
-from openlmi.software.LMI_SoftwarePackageChecks import \
- LMI_SoftwarePackageChecks
+from openlmi.software.LMI_SoftwareIdentity import LMI_SoftwareIdentity
+from openlmi.software.LMI_SystemSoftwareCollection import \
+ LMI_SystemSoftwareCollection
+from openlmi.software.LMI_HostedSoftwareCollection import \
+ LMI_HostedSoftwareCollection
+from openlmi.software.LMI_MemberOfSoftwareCollection import \
+ LMI_MemberOfSoftwareCollection
from openlmi.software.yumdb import YumDB
def get_providers(env):
+ """
+ @return mapping of provider names to corresponding provider instances.
+ """
cmpi_logging.LogManager(env)
providers = {
- "LMI_SoftwarePackage" : LMI_SoftwarePackage(env),
- "LMI_SoftwareInstalledPackage" : LMI_SoftwareInstalledPackage(env),
- "LMI_SoftwareFileCheck" : LMI_SoftwareFileCheck(env),
- "LMI_SoftwarePackageChecks" : LMI_SoftwarePackageChecks(env)
+ "LMI_SoftwareIdentity" : LMI_SoftwareIdentity(env),
+ "LMI_SystemSoftwareCollection" : LMI_SystemSoftwareCollection(env),
+ "LMI_HostedSoftwareCollection" : LMI_HostedSoftwareCollection(env),
+ "LMI_MemberOfSoftwareCollection" : LMI_MemberOfSoftwareCollection(env)
}
return providers
def can_unload(_env):
+ """ Says, whether providers can be unlouded. """
return True
def shutdown(_env):
- YumDB.getInstance().clean_up()
+ """ Release resources upon cleanup. """
+ YumDB.get_instance().clean_up()
diff --git a/src/software/openlmi/software/core/ComputerSystem.py b/src/software/openlmi/software/core/ComputerSystem.py
index 90699a0..4f99ee3 100644
--- a/src/software/openlmi/software/core/ComputerSystem.py
+++ b/src/software/openlmi/software/core/ComputerSystem.py
@@ -40,9 +40,12 @@ def check_path_property(env, op, prop_name):
Linux_ComputerSystem corresponding to this system.
If not, an exception is raised.
"""
+ if not prop_name in op:
+ raise pywbem.CIMError(pywbem.CIM_ERR_INVALID_PARAMETER,
+ "Missing %s key property!" % prop_name)
system = op[prop_name]
if not isinstance(system, pywbem.CIMInstanceName):
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ raise pywbem.CIMError(pywbem.CIM_ERR_INVALID_PARAMETER,
"\"%s\" must be a CIMInstanceName" % prop_name)
our_system = get_path(prefix='CIM')
ch = env.get_cimom_handle()
diff --git a/src/software/openlmi/software/core/SoftwareFileCheck.py b/src/software/openlmi/software/core/SoftwareFileCheck.py
deleted file mode 100644
index 409db09..0000000
--- a/src/software/openlmi/software/core/SoftwareFileCheck.py
+++ /dev/null
@@ -1,515 +0,0 @@
-# -*- encoding: utf-8 -*-
-# Software Management Providers
-#
-# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-#
-# Authors: Michal Minar <miminar@redhat.com>
-#
-
-"""
-Just a common functionality related to SoftwareFileCheck provider.
-"""
-
-import collections
-import hashlib
-import os
-import pywbem
-import stat
-import yum
-
-from openlmi.common import cmpi_logging
-from openlmi.software import util
-from openlmi.software.yumdb import YumDB
-from openlmi.software.yumdb import packageinfo
-from openlmi.software.yumdb import packagecheck
-
-PASSED_FLAGS_DESCRIPTIONS = (
- "Existence",
- "File Type",
- "File Size",
- "File Mode",
- "File Checksum",
- "Device major/minor number",
- "Symlink Target",
- "User Ownership", "Group Ownership",
- "Modify Time")
-
-# Named tuple to store results of rpm file check as pywbem values, all results
-# are in the form:
-# (expected, reality)
-# where
-# expected is value from rpm package
-# reality is value obtained from installed file
-# None means, that value could not be obtained. Except for "exists" and
-# "md5_checksum" attributes, where "exists" is boolean and "md5_checksum" is
-# a string.
-# for example:
-# file_check.file_type == (4, 3)
-FileCheck = collections.namedtuple('FileCheck', #pylint: disable=C0103
- 'exists, md5_checksum, file_type, file_size, file_mode, '
- 'file_checksum, device, link_target, user_id, group_id, '
- 'last_modification_time')
-
-@cmpi_logging.trace_function
-def checksumtype_num2hash(csumt):
- """
- @param csumt checksum type as a number obtained from package
- @return hash function object corresponding to csumt
- """
- return getattr(hashlib, yum.constants.RPM_CHECKSUM_TYPES[csumt])
-
-@cmpi_logging.trace_function
-def checksumtype_str2pywbem(alg):
- """
- @param alg is a name of algorithm used for checksum
- @return pywbem number corresponding to given alg
- """
- try:
- res = packagecheck.CHECKSUMTYPE_STR2NUM[alg.lower()]
- except KeyError:
- res = 0
- return pywbem.Uint16(res)
-
-@cmpi_logging.trace_function
-def filetype_str2pywbem(file_type):
- """
- @param file_type is a name of file type obtained from pkg headers
- @return pywbem number corresponding to thus file type
- """
- try:
- return pywbem.Uint16(
- { 'file' : Values.FileType.File
- , 'directory' : Values.FileType.Directory
- , 'symlink' : Values.FileType.Symlink
- , 'fifo' : Values.FileType.FIFO
- , 'character device' : Values.FileType.Character_Device
- , 'block device' : Values.FileType.Block_Device
- }[file_type])
- except KeyError:
- return Values.FileType.Unknown
-
-@cmpi_logging.trace_function
-def filetype_mode2pywbem(mode):
- """
- @param mode is a raw file mode as integer
- @return pywbem numeric value of file's type
- """
- for i, name in enumerate(
- ('REG', 'DIR', 'LNK', 'FIFO', 'CHR', 'BLK'), 1):
- if getattr(stat, 'S_IS' + name)(mode):
- return pywbem.Uint16(i)
- return pywbem.Uint16(0)
-
-@cmpi_logging.trace_function
-def mode2pywbem_flags(mode):
- """
- @param mode if None, file does not exist
- @return list of integer flags describing file's access permissions
- """
- if mode is None:
- return None
- flags = []
- for i, flag in enumerate((
- stat.S_IXOTH,
- stat.S_IWOTH,
- stat.S_IROTH,
- stat.S_IXGRP,
- stat.S_IWGRP,
- stat.S_IRGRP,
- stat.S_IXUSR,
- stat.S_IWUSR,
- stat.S_IRUSR,
- stat.S_ISVTX,
- stat.S_ISGID,
- stat.S_ISUID)):
- if flag & mode:
- flags.append(pywbem.Uint8(i))
- return flags
-
-@cmpi_logging.trace_function
-def hashfile(afile, hashers, blocksize=65536):
- """
- @param hashers is a list of hash objects
- @return list of digest strings (in hex format) for each hash object
- given in the same order
- """
- if not isinstance(hashers, (tuple, list, set, frozenset)):
- hashers = (hashers, )
- buf = afile.read(blocksize)
- while len(buf) > 0:
- for hashfunc in hashers:
- hashfunc.update(buf)
- buf = afile.read(blocksize)
- return [ hashfunc.hexdigest() for hashfunc in hashers ]
-
-@cmpi_logging.trace_function
-def compute_checksums(checksum_type, file_type, file_path):
- """
- @param file_type is not a file, then zeroes are returned
- @param checksum_type selected hash algorithm to compute second
- checksum
- @return (md5sum, checksum)
- both checksums are computed from file_path's content
- first one is always md5, the second one depends on checksum_type
- if file does not exists, (None, None) is returned
- """
- hashers = [hashlib.md5()] #pylint: disable=E1101
- if checksum_type != packagecheck.CHECKSUMTYPE_STR2NUM["md5"]:
- hashers.append(checksumtype_num2hash(checksum_type)())
- if file_type != filetype_str2pywbem('file'):
- rslts = ['0'*len(h.hexdigest()) for h in hashers]
- else:
- try:
- with open(file_path, 'rb') as fobj:
- rslts = hashfile(fobj, hashers)
- except (OSError, IOError) as exc:
- cmpi_logging.logger.error("could not open file \"%s\""
- " for reading: %s", file_path, exc)
- return None, None
- return (rslts[0], rslts[1] if len(rslts) > 1 else rslts[0]*2)
-
-@cmpi_logging.trace_function
-def object_path2pkg_file(objpath):
- """
- @return (package_info, package_check)
- """
- if not isinstance(objpath, pywbem.CIMInstanceName):
- raise TypeError("objpath must be instance of CIMInstanceName, "
- "not \"%s\"" % objpath.__class__.__name__)
-
- if ( not objpath['Name'] or not objpath['SoftwareElementID']
- or not objpath['CheckID']
- or not objpath['CheckID'].endswith('#'+objpath['Name'])
- or objpath['SoftwareElementID'].find(objpath['Version']) == -1):
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong keys.")
- if objpath['SoftwareElementState'] not in ("2", 2):
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Only \"Executable\" software element state supported")
- if not util.check_target_operating_system(objpath['TargetOperatingSystem']):
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Wrong target operating system.")
- if not objpath['Name'] or not objpath['Version']:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- 'Both "Name" and "Version" must be given')
- match = util.RE_NEVRA_OPT_EPOCH.match(objpath['SoftwareElementID'])
- if not match:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Wrong SotwareElementID. Expected valid nevra"
- " (name-epoch:version-release.arch).")
- if objpath['Version'] != match.group('ver'):
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Version does not match version part in SoftwareElementID.")
-
- with YumDB.getInstance() as ydb:
- pkglist = ydb.filter_packages('installed', **util.nevra2filter(match))
- if len(pkglist) < 1:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "No matching package installed.")
- pkg = pkglist[0]
- pkg_check = ydb.check_package(pkg)
- return (pkg, pkg_check, pkg_check[objpath["Name"]])
-
-@cmpi_logging.trace_function
-def test_file(checksum_type, package_file):
- """
- @param checksum type is a pywbem value for ChecksumType property
- @return instance of FileCheck
- """
- if not isinstance(package_file, packagecheck.PackageFile):
- raise TypeError("package_file must be an instance of PackageFile"
- " not \"%s\"" % package_file.__class__.__name__)
- exists = os.path.lexists(package_file.path)
- md5_checksum = None
- expected = {
- "file_type" : filetype_str2pywbem(package_file.file_type),
- "user_id" : pywbem.Uint32(package_file.uid),
- "group_id" : pywbem.Uint32(package_file.gid),
- "file_mode" : pywbem.Uint32(package_file.mode),
- "file_size" : pywbem.Uint64(package_file.size),
- "link_target" : package_file.link_target,
- "file_checksum" : package_file.checksum,
- "device" : pywbem.Uint64(package_file.device)
- if package_file.device is not None else None,
- "last_modification_time" : pywbem.Uint64(package_file.mtime)
- }
- if not exists:
- reality = collections.defaultdict(lambda: None)
- else:
- fstat = os.lstat(package_file.path)
- reality = {
- "file_type" : filetype_mode2pywbem(fstat.st_mode),
- "user_id" : pywbem.Uint32(fstat.st_uid),
- "group_id" : pywbem.Uint32(fstat.st_gid),
- "file_mode" : pywbem.Uint32(fstat.st_mode),
- "file_size" : pywbem.Uint64(fstat.st_size),
- "last_modification_time" : pywbem.Uint64(fstat.st_mtime)
- }
- reality["device"] = (
- pywbem.Uint64(fstat.st_dev)
- if reality['file_type'] == filetype_str2pywbem("device")
- else None)
- reality["link_target"] = (os.readlink(package_file.path)
- if os.path.islink(package_file.path) else None)
- md5_checksum, checksum = compute_checksums(
- checksum_type, reality["file_type"], package_file.path)
- reality["file_checksum"] = checksum
- kwargs = dict(exists=exists, md5_checksum=md5_checksum,
- **dict((k, (expected[k], reality[k])) for k in expected))
- return FileCheck(**kwargs)
-
-@cmpi_logging.trace_function
-def _filecheck2model_flags(file_check):
- """
- @param file_check is an instance of FileCheck
- @return pywbem value for PassedFlags property
- """
- if not isinstance(file_check, FileCheck):
- raise TypeError("file_check must be an instance of FileCheck")
- flags = []
- for k, value in file_check._asdict().items(): #pylint: disable=W0212
- if isinstance(value, tuple):
- if ( k in ("last_modification_time", "file_size")
- and file_check.file_type[0] != filetype_str2pywbem('file')):
- # last_modification_time check is valid only for
- # regular files
- flag = file_check.exists
- elif ( k == "file_mode"
- and file_check.file_type[0] == filetype_str2pywbem('symlink')):
- # do not check mode of symlinks
- flag = ( file_check.exists
- and file_check.file_type[0] == file_check.file_type[1])
- else:
- flag = file_check.exists and value[0] == value[1]
- flags.append(flag)
- elif isinstance(value, bool):
- flags.append(value)
- return flags
-
-@cmpi_logging.trace_function
-def filecheck_passed(file_check):
- """
- @return True if installed file passed all checks.
- """
- return all(_filecheck2model_flags(file_check))
-
-@cmpi_logging.trace_function
-def _fill_non_key_values(model, pkg_check, pkg_file, file_check=None):
- """
- Fills a non key values into instance of SoftwareFileCheck.
- """
- model['FileName'] = os.path.basename(pkg_file.path)
- model['FileChecksumType'] = csumt = pywbem.Uint16(
- pkg_check.file_checksum_type)
- if file_check is None:
- file_check = test_file(csumt, pkg_file)
- for mattr, fattr in (
- ('FileType', 'file_type'),
- ('FileUserID', 'user_id'),
- ('FileGroupID', 'group_id'),
- ('FileMode', 'file_mode'),
- ('LastModificationTime', 'last_modification_time'),
- ('FileSize', 'file_size'),
- ('LinkTarget', 'link_target'),
- ('FileChecksum', 'file_checksum')):
- exp, rea = getattr(file_check, fattr)
- if exp is not None:
- model['Expected' + mattr] = exp
- if rea is not None:
- model[mattr] = rea
- model['ExpectedFileModeFlags'] = mode2pywbem_flags(file_check.file_mode[0])
- if file_check.exists:
- model['FileModeFlags'] = mode2pywbem_flags(file_check.file_mode[1])
- model['FileExists'] = file_check.exists
- if file_check.md5_checksum is not None:
- model['MD5Checksum'] = file_check.md5_checksum
- model['PassedFlags'] = _filecheck2model_flags(file_check)
- model['PassedFlagsDescriptions'] = list(PASSED_FLAGS_DESCRIPTIONS)
-
-@cmpi_logging.trace_function
-def filecheck2model(package_info, package_check, file_name, keys_only=True,
- model=None, file_check=None):
- """
- @param package_file is an instance of yumdb.PackageFile
- @param file_name a absolute file path contained in package
- @param keys_only if True, then only key values will be filed
- @param model if given, then this instance will be modified and
- returned
- @param file_check if not given, it will be computed
- @return instance of LMI_SoftwareFileCheck class with all desired
- values filed
- """
- if not isinstance(package_info, packageinfo.PackageInfo):
- raise TypeError(
- "package_info must be an instance ofyumdb.PackageInfo")
- if not isinstance(package_check, packagecheck.PackageCheck):
- raise TypeError(
- "package_check must be an instance of yumdb.PackageFile")
- if not file_name in package_check:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "File \"%s\" not found among package files" % file_name)
- if model is None:
- model = pywbem.CIMInstanceName("LMI_SoftwareFileCheck",
- namespace="root/cimv2")
- if not keys_only:
- model = pywbem.CIMInstance("LMI_SoftwareFileCheck", path=model)
- package_file = package_check[file_name]
- model['Name'] = package_file.path
- model['SoftwareElementID'] = package_info.nevra
- model['SoftwareElementState'] = Values.SoftwareElementState.Executable
- model['TargetOperatingSystem'] = pywbem.Uint16(
- util.get_target_operating_system()[0])
- model['Version'] = package_info.version
- model['CheckID'] = '%s#%s' % (package_info.name, package_file.path)
- if not keys_only:
- if file_check is not None:
- if not isinstance(file_check, FileCheck):
- raise TypeError("file_check must be an instance of FileCheck")
- _fill_non_key_values(model, package_check, package_file, file_check)
- return model
-
-class Values(object):
- """
- Enumerations of LMI_SoftwareFileCheck class properties.
- """
- class TargetOperatingSystem(object):
- Unknown = pywbem.Uint16(0)
- Other = pywbem.Uint16(1)
- MACOS = pywbem.Uint16(2)
- ATTUNIX = pywbem.Uint16(3)
- DGUX = pywbem.Uint16(4)
- DECNT = pywbem.Uint16(5)
- Tru64_UNIX = pywbem.Uint16(6)
- OpenVMS = pywbem.Uint16(7)
- HPUX = pywbem.Uint16(8)
- AIX = pywbem.Uint16(9)
- MVS = pywbem.Uint16(10)
- OS400 = pywbem.Uint16(11)
- OS_2 = pywbem.Uint16(12)
- JavaVM = pywbem.Uint16(13)
- MSDOS = pywbem.Uint16(14)
- WIN3x = pywbem.Uint16(15)
- WIN95 = pywbem.Uint16(16)
- WIN98 = pywbem.Uint16(17)
- WINNT = pywbem.Uint16(18)
- WINCE = pywbem.Uint16(19)
- NCR3000 = pywbem.Uint16(20)
- NetWare = pywbem.Uint16(21)
- OSF = pywbem.Uint16(22)
- DC_OS = pywbem.Uint16(23)
- Reliant_UNIX = pywbem.Uint16(24)
- SCO_UnixWare = pywbem.Uint16(25)
- SCO_OpenServer = pywbem.Uint16(26)
- Sequent = pywbem.Uint16(27)
- IRIX = pywbem.Uint16(28)
- Solaris = pywbem.Uint16(29)
- SunOS = pywbem.Uint16(30)
- U6000 = pywbem.Uint16(31)
- ASERIES = pywbem.Uint16(32)
- HP_NonStop_OS = pywbem.Uint16(33)
- HP_NonStop_OSS = pywbem.Uint16(34)
- BS2000 = pywbem.Uint16(35)
- LINUX = pywbem.Uint16(36)
- Lynx = pywbem.Uint16(37)
- XENIX = pywbem.Uint16(38)
- VM = pywbem.Uint16(39)
- Interactive_UNIX = pywbem.Uint16(40)
- BSDUNIX = pywbem.Uint16(41)
- FreeBSD = pywbem.Uint16(42)
- NetBSD = pywbem.Uint16(43)
- GNU_Hurd = pywbem.Uint16(44)
- OS9 = pywbem.Uint16(45)
- MACH_Kernel = pywbem.Uint16(46)
- Inferno = pywbem.Uint16(47)
- QNX = pywbem.Uint16(48)
- EPOC = pywbem.Uint16(49)
- IxWorks = pywbem.Uint16(50)
- VxWorks = pywbem.Uint16(51)
- MiNT = pywbem.Uint16(52)
- BeOS = pywbem.Uint16(53)
- HP_MPE = pywbem.Uint16(54)
- NextStep = pywbem.Uint16(55)
- PalmPilot = pywbem.Uint16(56)
- Rhapsody = pywbem.Uint16(57)
- Windows_2000 = pywbem.Uint16(58)
- Dedicated = pywbem.Uint16(59)
- OS_390 = pywbem.Uint16(60)
- VSE = pywbem.Uint16(61)
- TPF = pywbem.Uint16(62)
- Windows__R__Me = pywbem.Uint16(63)
- Caldera_Open_UNIX = pywbem.Uint16(64)
- OpenBSD = pywbem.Uint16(65)
- Not_Applicable = pywbem.Uint16(66)
- Windows_XP = pywbem.Uint16(67)
- z_OS = pywbem.Uint16(68)
- Microsoft_Windows_Server_2003 = pywbem.Uint16(69)
- Microsoft_Windows_Server_2003_64_Bit = pywbem.Uint16(70)
- Windows_XP_64_Bit = pywbem.Uint16(71)
- Windows_XP_Embedded = pywbem.Uint16(72)
- Windows_Vista = pywbem.Uint16(73)
- Windows_Vista_64_Bit = pywbem.Uint16(74)
- Windows_Embedded_for_Point_of_Service = pywbem.Uint16(75)
- Microsoft_Windows_Server_2008 = pywbem.Uint16(76)
- Microsoft_Windows_Server_2008_64_Bit = pywbem.Uint16(77)
- FreeBSD_64_Bit = pywbem.Uint16(78)
- RedHat_Enterprise_Linux = pywbem.Uint16(79)
- RedHat_Enterprise_Linux_64_Bit = pywbem.Uint16(80)
- Solaris_64_Bit = pywbem.Uint16(81)
- SUSE = pywbem.Uint16(82)
- SUSE_64_Bit = pywbem.Uint16(83)
- SLES = pywbem.Uint16(84)
- SLES_64_Bit = pywbem.Uint16(85)
- Novell_OES = pywbem.Uint16(86)
- Novell_Linux_Desktop = pywbem.Uint16(87)
- Sun_Java_Desktop_System = pywbem.Uint16(88)
- Mandriva = pywbem.Uint16(89)
- Mandriva_64_Bit = pywbem.Uint16(90)
- TurboLinux = pywbem.Uint16(91)
- TurboLinux_64_Bit = pywbem.Uint16(92)
- Ubuntu = pywbem.Uint16(93)
- Ubuntu_64_Bit = pywbem.Uint16(94)
- Debian = pywbem.Uint16(95)
- Debian_64_Bit = pywbem.Uint16(96)
- Linux_2_4_x = pywbem.Uint16(97)
- Linux_2_4_x_64_Bit = pywbem.Uint16(98)
- Linux_2_6_x = pywbem.Uint16(99)
- Linux_2_6_x_64_Bit = pywbem.Uint16(100)
- Linux_64_Bit = pywbem.Uint16(101)
- Other_64_Bit = pywbem.Uint16(102)
- Microsoft_Windows_Server_2008_R2 = pywbem.Uint16(103)
- VMware_ESXi = pywbem.Uint16(104)
- Microsoft_Windows_7 = pywbem.Uint16(105)
- CentOS_32_bit = pywbem.Uint16(106)
- CentOS_64_bit = pywbem.Uint16(107)
- Oracle_Enterprise_Linux_32_bit = pywbem.Uint16(108)
- Oracle_Enterprise_Linux_64_bit = pywbem.Uint16(109)
- eComStation_32_bitx = pywbem.Uint16(110)
-
- class SoftwareElementState(object):
- Deployable = pywbem.Uint16(0)
- Installable = pywbem.Uint16(1)
- Executable = pywbem.Uint16(2)
- Running = pywbem.Uint16(3)
-
- class FileType(object):
- Unknown = pywbem.Uint16(0)
- File = pywbem.Uint16(1)
- Directory = pywbem.Uint16(2)
- Symlink = pywbem.Uint16(3)
- FIFO = pywbem.Uint16(4)
- Character_Device = pywbem.Uint16(5)
- Block_Device = pywbem.Uint16(6)
-
diff --git a/src/software/openlmi/software/core/SoftwareIdentity.py b/src/software/openlmi/software/core/SoftwareIdentity.py
new file mode 100644
index 0000000..1c4b9fe
--- /dev/null
+++ b/src/software/openlmi/software/core/SoftwareIdentity.py
@@ -0,0 +1,278 @@
+# -*- encoding: utf-8 -*-
+# Software Management Providers
+#
+# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Just a common functionality related to LMI_SoftwareIdentity provider.
+"""
+
+import pywbem
+
+from openlmi.common import cmpi_logging
+from openlmi.software import util
+from openlmi.software.yumdb import PackageInfo, YumDB
+
+class Values(object):
+ class DetailedStatus(object):
+ Not_Available = pywbem.Uint16(0)
+ No_Additional_Information = pywbem.Uint16(1)
+ Stressed = pywbem.Uint16(2)
+ Predictive_Failure = pywbem.Uint16(3)
+ Non_Recoverable_Error = pywbem.Uint16(4)
+ Supporting_Entity_in_Error = pywbem.Uint16(5)
+ # DMTF_Reserved = ..
+ # Vendor_Reserved = 0x8000..
+
+ class Status(object):
+ OK = 'OK'
+ Error = 'Error'
+ Degraded = 'Degraded'
+ Unknown = 'Unknown'
+ Pred_Fail = 'Pred Fail'
+ Starting = 'Starting'
+ Stopping = 'Stopping'
+ Service = 'Service'
+ Stressed = 'Stressed'
+ NonRecover = 'NonRecover'
+ No_Contact = 'No Contact'
+ Lost_Comm = 'Lost Comm'
+ Stopped = 'Stopped'
+
+ class HealthState(object):
+ Unknown = pywbem.Uint16(0)
+ OK = pywbem.Uint16(5)
+ Degraded_Warning = pywbem.Uint16(10)
+ Minor_failure = pywbem.Uint16(15)
+ Major_failure = pywbem.Uint16(20)
+ Critical_failure = pywbem.Uint16(25)
+ Non_recoverable_error = pywbem.Uint16(30)
+ # DMTF_Reserved = ..
+ # Vendor_Specific = 32768..65535
+
+ class Classifications(object):
+ Unknown = pywbem.Uint16(0)
+ Other = pywbem.Uint16(1)
+ Driver = pywbem.Uint16(2)
+ Configuration_Software = pywbem.Uint16(3)
+ Application_Software = pywbem.Uint16(4)
+ Instrumentation = pywbem.Uint16(5)
+ Firmware_BIOS = pywbem.Uint16(6)
+ Diagnostic_Software = pywbem.Uint16(7)
+ Operating_System = pywbem.Uint16(8)
+ Middleware = pywbem.Uint16(9)
+ Firmware = pywbem.Uint16(10)
+ BIOS_FCode = pywbem.Uint16(11)
+ Support_Service_Pack = pywbem.Uint16(12)
+ Software_Bundle = pywbem.Uint16(13)
+ # DMTF_Reserved = ..
+ # Vendor_Reserved = 0x8000..0xFFFF
+
+ class ExtendedResourceType(object):
+ Unknown = pywbem.Uint16(0)
+ Other = pywbem.Uint16(1)
+ Not_Applicable = pywbem.Uint16(2)
+ Linux_RPM = pywbem.Uint16(3)
+ HP_UX_Depot = pywbem.Uint16(4)
+ Windows_MSI = pywbem.Uint16(5)
+ Solaris_Package = pywbem.Uint16(6)
+ Macintosh_Disk_Image = pywbem.Uint16(7)
+ Debian_linux_Package = pywbem.Uint16(8)
+ VMware_vSphere_Installation_Bundle = pywbem.Uint16(9)
+ VMware_Software_Bulletin = pywbem.Uint16(10)
+ HP_Smart_Component = pywbem.Uint16(11)
+ # DMTF_Reserved = ..
+ # Vendor_Reserved = 0x8000..
+
+ class CommunicationStatus(object):
+ Unknown = pywbem.Uint16(0)
+ Not_Available = pywbem.Uint16(1)
+ Communication_OK = pywbem.Uint16(2)
+ Lost_Communication = pywbem.Uint16(3)
+ No_Contact = pywbem.Uint16(4)
+ # DMTF_Reserved = ..
+ # Vendor_Reserved = 0x8000..
+
+ class OperationalStatus(object):
+ Unknown = pywbem.Uint16(0)
+ Other = pywbem.Uint16(1)
+ OK = pywbem.Uint16(2)
+ Degraded = pywbem.Uint16(3)
+ Stressed = pywbem.Uint16(4)
+ Predictive_Failure = pywbem.Uint16(5)
+ Error = pywbem.Uint16(6)
+ Non_Recoverable_Error = pywbem.Uint16(7)
+ Starting = pywbem.Uint16(8)
+ Stopping = pywbem.Uint16(9)
+ Stopped = pywbem.Uint16(10)
+ In_Service = pywbem.Uint16(11)
+ No_Contact = pywbem.Uint16(12)
+ Lost_Communication = pywbem.Uint16(13)
+ Aborted = pywbem.Uint16(14)
+ Dormant = pywbem.Uint16(15)
+ Supporting_Entity_in_Error = pywbem.Uint16(16)
+ Completed = pywbem.Uint16(17)
+ Power_Mode = pywbem.Uint16(18)
+ Relocating = pywbem.Uint16(19)
+ # DMTF_Reserved = ..
+ # Vendor_Reserved = 0x8000..
+
+ class OperatingStatus(object):
+ Unknown = pywbem.Uint16(0)
+ Not_Available = pywbem.Uint16(1)
+ Servicing = pywbem.Uint16(2)
+ Starting = pywbem.Uint16(3)
+ Stopping = pywbem.Uint16(4)
+ Stopped = pywbem.Uint16(5)
+ Aborted = pywbem.Uint16(6)
+ Dormant = pywbem.Uint16(7)
+ Completed = pywbem.Uint16(8)
+ Migrating = pywbem.Uint16(9)
+ Emigrating = pywbem.Uint16(10)
+ Immigrating = pywbem.Uint16(11)
+ Snapshotting = pywbem.Uint16(12)
+ Shutting_Down = pywbem.Uint16(13)
+ In_Test = pywbem.Uint16(14)
+ Transitioning = pywbem.Uint16(15)
+ In_Service = pywbem.Uint16(16)
+ # DMTF_Reserved = ..
+ # Vendor_Reserved = 0x8000..
+
+ class PrimaryStatus(object):
+ Unknown = pywbem.Uint16(0)
+ OK = pywbem.Uint16(1)
+ Degraded = pywbem.Uint16(2)
+ Error = pywbem.Uint16(3)
+ # DMTF_Reserved = ..
+ # Vendor_Reserved = 0x8000..
+
+@cmpi_logging.trace_function
+def object_path2pkg(op, kind='installed'):
+ """
+ @param op must contain precise information of package,
+ otherwise an error is raised
+ @param kind one of yumdb.jobs.YumGetPackageList.SUPPORTED_KINDS
+ says, where to look for given package
+ """
+ if not isinstance(kind, basestring):
+ raise TypeError("kind must be a string")
+ if not isinstance(op, pywbem.CIMInstanceName):
+ raise pywbem.CIMError(pywbem.CIM_ERR_INVALID_PARAMETER,
+ "op must be an instance of CIMInstanceName")
+
+ if (not "InstanceID" in op or not op['InstanceID']):
+ raise pywbem.CIMError(pywbem.CIM_ERR_INVALID_PARAMETER, "Wrong keys.")
+ instid = op['InstanceID']
+ if not instid.lower().startswith("lmi:pkg:"):
+ raise pywbem.CIMError(pywbem.CIM_ERR_INVALID_PARAMETER,
+ "InstanceID must start with LMI:PKG: prefix.")
+ instid = instid[len("LMI:PKG:"):]
+ match = util.RE_NEVRA_OPT_EPOCH.match(instid)
+ if not match:
+ raise pywbem.CIMError(pywbem.CIM_ERR_INVALID_PARAMETER,
+ "Wrong InstanceID. Expected valid nevra"
+ ' (name-[epoch:]version-release.arch): "%s".' %
+ instid)
+
+ pkglist = YumDB.get_instance().filter_packages(kind,
+ allow_duplicates=kind not in ('installed', 'avail_reinst'),
+ **util.nevra2filter(match))
+ if len(pkglist) > 0:
+ return pkglist[0]
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ 'No matching package found for InstanceID=\"%s\".' %
+ instid)
+
+@cmpi_logging.trace_function
+def pkg2model(pkg_info, keys_only=True, model=None):
+ """
+ @param model if None, will be filled with data, otherwise
+ a new instance of CIMInstance or CIMObjectPath is created
+ """
+ if not isinstance(pkg_info, PackageInfo):
+ raise TypeError("pkg must be an instance of PackageInfo")
+ if model is None:
+ model = pywbem.CIMInstanceName("LMI_SoftwareIdentity",
+ namespace="root/cimv2")
+ if not keys_only:
+ model = pywbem.CIMInstance("LMI_SoftwareIdentity", path=model)
+ if isinstance(model, pywbem.CIMInstance):
+ def _set_key(k, value):
+ """Sets the value of key property of cim instance"""
+ model[k] = value
+ model.path[k] = value #pylint: disable=E1103
+ else:
+ _set_key = model.__setitem__
+ _set_key('InstanceID', 'LMI:PKG:'+pkg_info.nevra)
+ if not keys_only:
+ #model['BuildNumber'] = pywbem.Uint16() # TODO
+ model['Caption'] = pkg_info.summary
+ #model['ClassificationDescriptions'] = ['',] # TODO
+ model['Classifications'] = [pywbem.Uint16(0)]
+ #model['CommunicationStatus'] = \
+ # self.Values.CommunicationStatus.<VAL> # TODO
+ model['Description'] = pkg_info.description
+ #model['DetailedStatus'] = self.Values.DetailedStatus.<VAL> # TODO
+ model['ElementName'] = pkg_info.nevra
+ #model['ExtendedResourceType'] = \
+ #self.Values.ExtendedResourceType.<VAL> # TODO
+ #model['HealthState'] = self.Values.HealthState.<VAL> # TODO
+ #model['IdentityInfoType'] = ['',] # TODO
+ #model['IdentityInfoValue'] = ['',] # TODO
+ if pkg_info.installed:
+ model['InstallDate'] = pywbem.CIMDateTime(
+ pkg_info.install_time)
+ model['IsEntity'] = True
+ #model['IsLargeBuildNumber'] = bool(False) # TODO
+ #model['Languages'] = ['',] # TODO
+ #model['LargeBuildNumber'] = pywbem.Uint64() # TODO
+ #model['MajorVersion'] = pywbem.Uint16() # TODO
+ #model['Manufacturer'] = '' # TODO
+ #model['MinExtendedResourceTypeBuildNumber'] = \
+ #pywbem.Uint16() # TODO
+ #model['MinExtendedResourceTypeMajorVersion'] = \
+ #pywbem.Uint16() # TODO
+ #model['MinExtendedResourceTypeMinorVersion'] = \
+ #pywbem.Uint16() # TODO
+ #model['MinExtendedResourceTypeRevisionNumber'] = \
+ #pywbem.Uint16() # TODO
+ #model['MinorVersion'] = pywbem.Uint16() # TODO
+ model['Name'] = pkg_info.name
+ try:
+ model["Epoch"] = pywbem.Uint32(int(pkg_info.epoch))
+ except ValueError:
+ cmpi_logging.logger.error('Could not convert epoch "%s"'
+ ' to integer for package \"%s\"!' % (pkg_info.epoch, pkg_info))
+ model['Version'] = pkg_info.version
+ model['Release'] = pkg_info.release
+ model['Architecture'] = pkg_info.arch
+ #model['OperatingStatus'] = \
+ #self.Values.OperatingStatus.<VAL> # TODO
+ #model['OperationalStatus'] = \
+ #[self.Values.OperationalStatus.<VAL>,] # TODO
+ #model['OtherExtendedResourceTypeDescription'] = '' # TODO
+ #model['PrimaryStatus'] = self.Values.PrimaryStatus.<VAL> # TODO
+ #model['ReleaseDate'] = pywbem.CIMDateTime() # TODO
+ #model['RevisionNumber'] = pywbem.Uint16() # TODO
+ #model['SerialNumber'] = '' # TODO
+ #model['Status'] = self.Values.Status.<VAL> # TODO
+ #model['StatusDescriptions'] = ['',] # TODO
+ #model['TargetOperatingSystems'] = ['',] # TODO
+ #model['TargetOSTypes'] = [pywbem.Uint16(),] # TODO
+ model['TargetTypes'] = ['rpm', 'yum']
+ model['VersionString'] = pkg_info.evra
+ return model
+
diff --git a/src/software/openlmi/software/core/SoftwareInstalledPackage.py b/src/software/openlmi/software/core/SoftwareInstalledPackage.py
deleted file mode 100644
index f91fb56..0000000
--- a/src/software/openlmi/software/core/SoftwareInstalledPackage.py
+++ /dev/null
@@ -1,39 +0,0 @@
-# -*- encoding: utf-8 -*-
-# Software Management Providers
-#
-# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-#
-# Authors: Michal Minar <miminar@redhat.com>
-#
-
-"""
-Just a common functionality related to LMI_SoftwarePackage provider.
-"""
-
-import pywbem
-
-class Values(object):
- class Update(object):
- Already_newest = pywbem.Uint16(0)
- Successful_installation = pywbem.Uint16(1)
- Failed = pywbem.Uint16(2)
-
- class CheckIntegrity(object):
- Pass = pywbem.Uint32(0)
- Not_passed = pywbem.Uint32(1)
- Error = pywbem.Uint32(2)
-
diff --git a/src/software/openlmi/software/core/SoftwarePackage.py b/src/software/openlmi/software/core/SoftwarePackage.py
deleted file mode 100644
index 0ef0d31..0000000
--- a/src/software/openlmi/software/core/SoftwarePackage.py
+++ /dev/null
@@ -1,412 +0,0 @@
-# -*- encoding: utf-8 -*-
-# Software Management Providers
-#
-# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-#
-# Authors: Michal Minar <miminar@redhat.com>
-#
-
-"""
-Just a common functionality related to LMI_SoftwarePackage provider.
-"""
-
-import pywbem
-
-from openlmi.common import cmpi_logging
-from openlmi.software import util
-from openlmi.software.yumdb import PackageInfo, YumDB
-
-@cmpi_logging.trace_function
-def object_path2pkg(objpath, kind='installed'):
- """
- @param objpath must contain precise information of package,
- otherwise a CIM_ERR_NOT_FOUND error is raised
- @param kind one of {'installed', 'all', 'available'}
- says, where to look for given package
- """
- if not isinstance(objpath, pywbem.CIMInstanceName):
- raise TypeError("objpath must be an instance of CIMInstanceName")
- if not isinstance(kind, basestring):
- raise TypeError("kind must be a string")
- if not kind in ('installed', 'all', 'available'):
- raise ValueError('unsupported package list "%s"' % kind)
-
- if ( not objpath['Name'] or not objpath['SoftwareElementID']
- or not objpath['SoftwareElementID'].startswith(objpath['Name'])
- or objpath['SoftwareElementID'].find(objpath['Version']) == -1):
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong keys.")
- if not util.check_target_operating_system(objpath['TargetOperatingSystem']):
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Wrong target operating system.")
- if not objpath['Name'] or not objpath['Version']:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- 'Both "Name" and "Version" must be given')
- match = util.RE_NEVRA_OPT_EPOCH.match(objpath['SoftwareElementID'])
- if not match:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Wrong SotwareElementID. Expected valid nevra"
- " (name-[epoch:]version-release.arch).")
- if objpath['Version'] != match.group('ver'):
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Version does not match version part in SoftwareElementID.")
- pkglist = YumDB.getInstance().filter_packages(kind,
- allow_duplicates=kind != 'installed',
- **util.nevra2filter(match))
- if len(pkglist) > 0:
- return pkglist[0]
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "No matching package found.")
-
-@cmpi_logging.trace_function
-def object_path2pkg_search(objpath):
- """
- similar to object_path2pkg, but tries to find best suitable
- package matching keys
-
- If any matching package is already installed, it is returned.
- Otherwise available package with highest version is returned.
-
- @param objpath may be object of CIMInstance or CIMInstanceName and
- must contain at least \"Name\" or \"SoftwareElementID\"
- @return instance PackageInfo
- """
- if isinstance(objpath, pywbem.CIMInstance):
- def _get_key(k):
- """@return value of instance's key"""
- value = objpath.properties.get(k, None)
- if isinstance(value, pywbem.CIMProperty):
- return value.value
- if value is not None:
- return value
- cmpi_logging.logger.error(
- 'missing key "%s" in inst.props', k)
- return objpath.path[k] if k in objpath.path else None
- elif isinstance(objpath, pywbem.CIMInstanceName):
- _get_key = lambda k: objpath[k] if k in objpath else None
- else:
- raise TypeError("objpath must be either CIMInstance"
- "or CIMInstanceName")
-
- # parse and check arguments
- filters = {}
- if _get_key('SoftwareElementID'):
- match = util.RE_NEVRA_OPT_EPOCH.match(_get_key('SoftwareElementID'))
- if not match:
- raise pywbem.CIMError(pywbem.CIM_ERR_INVALID_PARAMETER,
- "SoftwareElementID could not be parsed.")
- filters = util.nevra2filter(match)
- else:
- for k in ('name', 'epoch', 'version', 'release', 'arch'):
- ikey = k if k != 'arch' else "architecture"
- if _get_key(ikey):
- filters[k] = _get_key(ikey)
-
- if not filters:
- raise pywbem.CIMError(pywbem.CIM_ERR_FAILED,
- "Too few key values given (give at least a Name).")
- if not 'name' in filters:
- raise pywbem.CIMError(pywbem.CIM_ERR_FAILED,
- "Missing either Name or SoftwareElementID property.")
-
- pkglist = YumDB.getInstance().filter_packages('all',
- allow_duplicates=True, sort=True, **filters)
- if len(pkglist) == 0:
- cmpi_logging.logger.error(
- 'could not find any matching package in list: %s',
- [p.nevra for p in pkglist])
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "No matching package found.")
- for pkg in pkglist: # check, whether package is already installed
- if pkg.installed:
- return pkg
- cmpi_logging.logger.info(
- ( 'found multiple matching packages'
- if len(pkglist) > 1 else 'exact match found'))
- return pkglist[-1] # select highest version
-
-@cmpi_logging.trace_function
-def pkg2model(pkg, keys_only=True, model=None):
- """
- @param model if None, will be filled with data, otherwise
- a new instance of CIMInstance or CIMObjectPath is created
- """
- if not isinstance(pkg, PackageInfo):
- raise TypeError("pkg must be an instance of PackageInfo")
- if model is None:
- model = pywbem.CIMInstanceName('LMI_SoftwarePackage',
- namespace='root/cimv2')
- if not keys_only:
- model = pywbem.CIMInstance('LMI_SoftwarePackage', path=model)
- if isinstance(model, pywbem.CIMInstance):
- def _set_key(k, value):
- """Sets the value of key property of cim instance"""
- model[k] = value
- model.path[k] = value #pylint: disable=E1103
- else:
- _set_key = model.__setitem__
- _set_key('Name', pkg.name)
- _set_key('SoftwareElementID', pkg.nevra)
- _set_key('SoftwareElementState',
- Values.SoftwareElementState.Executable
- if pkg.installed
- else Values.SoftwareElementState.Installable)
- _set_key('TargetOperatingSystem',
- pywbem.Uint16(util.get_target_operating_system()[0]))
- _set_key('Version', pkg.version)
- if not keys_only:
- model['Caption'] = pkg.summary
- model['Description'] = pkg.description
- if pkg.installed:
- model['InstallDate'] = pywbem.CIMDateTime(pkg.install_time)
- if pkg.vendor:
- model['Manufacturer'] = pkg.vendor
- model['Release'] = pkg.release
- model['Epoch'] = pywbem.Uint16(pkg.epoch)
- model["Architecture"] = pkg.arch
- model['License'] = pkg.license
- model['Group'] = pkg.group
- model['Size'] = pywbem.Uint64(pkg.size)
- return model
-
-class Values(object):
- class DetailedStatus(object):
- Not_Available = pywbem.Uint16(0)
- No_Additional_Information = pywbem.Uint16(1)
- Stressed = pywbem.Uint16(2)
- Predictive_Failure = pywbem.Uint16(3)
- Non_Recoverable_Error = pywbem.Uint16(4)
- Supporting_Entity_in_Error = pywbem.Uint16(5)
- # DMTF_Reserved = ..
- # Vendor_Reserved = 0x8000..
-
- class Status(object):
- OK = 'OK'
- Error = 'Error'
- Degraded = 'Degraded'
- Unknown = 'Unknown'
- Pred_Fail = 'Pred Fail'
- Starting = 'Starting'
- Stopping = 'Stopping'
- Service = 'Service'
- Stressed = 'Stressed'
- NonRecover = 'NonRecover'
- No_Contact = 'No Contact'
- Lost_Comm = 'Lost Comm'
- Stopped = 'Stopped'
-
- class HealthState(object):
- Unknown = pywbem.Uint16(0)
- OK = pywbem.Uint16(5)
- Degraded_Warning = pywbem.Uint16(10)
- Minor_failure = pywbem.Uint16(15)
- Major_failure = pywbem.Uint16(20)
- Critical_failure = pywbem.Uint16(25)
- Non_recoverable_error = pywbem.Uint16(30)
- # DMTF_Reserved = ..
- # Vendor_Specific = 32768..65535
-
- class TargetOperatingSystem(object):
- Unknown = pywbem.Uint16(0)
- Other = pywbem.Uint16(1)
- MACOS = pywbem.Uint16(2)
- ATTUNIX = pywbem.Uint16(3)
- DGUX = pywbem.Uint16(4)
- DECNT = pywbem.Uint16(5)
- Tru64_UNIX = pywbem.Uint16(6)
- OpenVMS = pywbem.Uint16(7)
- HPUX = pywbem.Uint16(8)
- AIX = pywbem.Uint16(9)
- MVS = pywbem.Uint16(10)
- OS400 = pywbem.Uint16(11)
- OS_2 = pywbem.Uint16(12)
- JavaVM = pywbem.Uint16(13)
- MSDOS = pywbem.Uint16(14)
- WIN3x = pywbem.Uint16(15)
- WIN95 = pywbem.Uint16(16)
- WIN98 = pywbem.Uint16(17)
- WINNT = pywbem.Uint16(18)
- WINCE = pywbem.Uint16(19)
- NCR3000 = pywbem.Uint16(20)
- NetWare = pywbem.Uint16(21)
- OSF = pywbem.Uint16(22)
- DC_OS = pywbem.Uint16(23)
- Reliant_UNIX = pywbem.Uint16(24)
- SCO_UnixWare = pywbem.Uint16(25)
- SCO_OpenServer = pywbem.Uint16(26)
- Sequent = pywbem.Uint16(27)
- IRIX = pywbem.Uint16(28)
- Solaris = pywbem.Uint16(29)
- SunOS = pywbem.Uint16(30)
- U6000 = pywbem.Uint16(31)
- ASERIES = pywbem.Uint16(32)
- HP_NonStop_OS = pywbem.Uint16(33)
- HP_NonStop_OSS = pywbem.Uint16(34)
- BS2000 = pywbem.Uint16(35)
- LINUX = pywbem.Uint16(36)
- Lynx = pywbem.Uint16(37)
- XENIX = pywbem.Uint16(38)
- VM = pywbem.Uint16(39)
- Interactive_UNIX = pywbem.Uint16(40)
- BSDUNIX = pywbem.Uint16(41)
- FreeBSD = pywbem.Uint16(42)
- NetBSD = pywbem.Uint16(43)
- GNU_Hurd = pywbem.Uint16(44)
- OS9 = pywbem.Uint16(45)
- MACH_Kernel = pywbem.Uint16(46)
- Inferno = pywbem.Uint16(47)
- QNX = pywbem.Uint16(48)
- EPOC = pywbem.Uint16(49)
- IxWorks = pywbem.Uint16(50)
- VxWorks = pywbem.Uint16(51)
- MiNT = pywbem.Uint16(52)
- BeOS = pywbem.Uint16(53)
- HP_MPE = pywbem.Uint16(54)
- NextStep = pywbem.Uint16(55)
- PalmPilot = pywbem.Uint16(56)
- Rhapsody = pywbem.Uint16(57)
- Windows_2000 = pywbem.Uint16(58)
- Dedicated = pywbem.Uint16(59)
- OS_390 = pywbem.Uint16(60)
- VSE = pywbem.Uint16(61)
- TPF = pywbem.Uint16(62)
- Windows__R__Me = pywbem.Uint16(63)
- Caldera_Open_UNIX = pywbem.Uint16(64)
- OpenBSD = pywbem.Uint16(65)
- Not_Applicable = pywbem.Uint16(66)
- Windows_XP = pywbem.Uint16(67)
- z_OS = pywbem.Uint16(68)
- Microsoft_Windows_Server_2003 = pywbem.Uint16(69)
- Microsoft_Windows_Server_2003_64_Bit = pywbem.Uint16(70)
- Windows_XP_64_Bit = pywbem.Uint16(71)
- Windows_XP_Embedded = pywbem.Uint16(72)
- Windows_Vista = pywbem.Uint16(73)
- Windows_Vista_64_Bit = pywbem.Uint16(74)
- Windows_Embedded_for_Point_of_Service = pywbem.Uint16(75)
- Microsoft_Windows_Server_2008 = pywbem.Uint16(76)
- Microsoft_Windows_Server_2008_64_Bit = pywbem.Uint16(77)
- FreeBSD_64_Bit = pywbem.Uint16(78)
- RedHat_Enterprise_Linux = pywbem.Uint16(79)
- RedHat_Enterprise_Linux_64_Bit = pywbem.Uint16(80)
- Solaris_64_Bit = pywbem.Uint16(81)
- SUSE = pywbem.Uint16(82)
- SUSE_64_Bit = pywbem.Uint16(83)
- SLES = pywbem.Uint16(84)
- SLES_64_Bit = pywbem.Uint16(85)
- Novell_OES = pywbem.Uint16(86)
- Novell_Linux_Desktop = pywbem.Uint16(87)
- Sun_Java_Desktop_System = pywbem.Uint16(88)
- Mandriva = pywbem.Uint16(89)
- Mandriva_64_Bit = pywbem.Uint16(90)
- TurboLinux = pywbem.Uint16(91)
- TurboLinux_64_Bit = pywbem.Uint16(92)
- Ubuntu = pywbem.Uint16(93)
- Ubuntu_64_Bit = pywbem.Uint16(94)
- Debian = pywbem.Uint16(95)
- Debian_64_Bit = pywbem.Uint16(96)
- Linux_2_4_x = pywbem.Uint16(97)
- Linux_2_4_x_64_Bit = pywbem.Uint16(98)
- Linux_2_6_x = pywbem.Uint16(99)
- Linux_2_6_x_64_Bit = pywbem.Uint16(100)
- Linux_64_Bit = pywbem.Uint16(101)
- Other_64_Bit = pywbem.Uint16(102)
- Microsoft_Windows_Server_2008_R2 = pywbem.Uint16(103)
- VMware_ESXi = pywbem.Uint16(104)
- Microsoft_Windows_7 = pywbem.Uint16(105)
- CentOS_32_bit = pywbem.Uint16(106)
- CentOS_64_bit = pywbem.Uint16(107)
- Oracle_Enterprise_Linux_32_bit = pywbem.Uint16(108)
- Oracle_Enterprise_Linux_64_bit = pywbem.Uint16(109)
- eComStation_32_bitx = pywbem.Uint16(110)
-
- class Remove(object):
- Not_installed = pywbem.Uint32(0)
- Successful_removal = pywbem.Uint32(1)
- Failed = pywbem.Uint32(2)
-
- class CommunicationStatus(object):
- Unknown = pywbem.Uint16(0)
- Not_Available = pywbem.Uint16(1)
- Communication_OK = pywbem.Uint16(2)
- Lost_Communication = pywbem.Uint16(3)
- No_Contact = pywbem.Uint16(4)
- # DMTF_Reserved = ..
- # Vendor_Reserved = 0x8000..
-
- class OperationalStatus(object):
- Unknown = pywbem.Uint16(0)
- Other = pywbem.Uint16(1)
- OK = pywbem.Uint16(2)
- Degraded = pywbem.Uint16(3)
- Stressed = pywbem.Uint16(4)
- Predictive_Failure = pywbem.Uint16(5)
- Error = pywbem.Uint16(6)
- Non_Recoverable_Error = pywbem.Uint16(7)
- Starting = pywbem.Uint16(8)
- Stopping = pywbem.Uint16(9)
- Stopped = pywbem.Uint16(10)
- In_Service = pywbem.Uint16(11)
- No_Contact = pywbem.Uint16(12)
- Lost_Communication = pywbem.Uint16(13)
- Aborted = pywbem.Uint16(14)
- Dormant = pywbem.Uint16(15)
- Supporting_Entity_in_Error = pywbem.Uint16(16)
- Completed = pywbem.Uint16(17)
- Power_Mode = pywbem.Uint16(18)
- Relocating = pywbem.Uint16(19)
- # DMTF_Reserved = ..
- # Vendor_Reserved = 0x8000..
-
- class OperatingStatus(object):
- Unknown = pywbem.Uint16(0)
- Not_Available = pywbem.Uint16(1)
- Servicing = pywbem.Uint16(2)
- Starting = pywbem.Uint16(3)
- Stopping = pywbem.Uint16(4)
- Stopped = pywbem.Uint16(5)
- Aborted = pywbem.Uint16(6)
- Dormant = pywbem.Uint16(7)
- Completed = pywbem.Uint16(8)
- Migrating = pywbem.Uint16(9)
- Emigrating = pywbem.Uint16(10)
- Immigrating = pywbem.Uint16(11)
- Snapshotting = pywbem.Uint16(12)
- Shutting_Down = pywbem.Uint16(13)
- In_Test = pywbem.Uint16(14)
- Transitioning = pywbem.Uint16(15)
- In_Service = pywbem.Uint16(16)
- # DMTF_Reserved = ..
- # Vendor_Reserved = 0x8000..
-
- class SoftwareElementState(object):
- Deployable = pywbem.Uint16(0)
- Installable = pywbem.Uint16(1)
- Executable = pywbem.Uint16(2)
- Running = pywbem.Uint16(3)
-
- class PrimaryStatus(object):
- Unknown = pywbem.Uint16(0)
- OK = pywbem.Uint16(1)
- Degraded = pywbem.Uint16(2)
- Error = pywbem.Uint16(3)
- # DMTF_Reserved = ..
- # Vendor_Reserved = 0x8000..
-
- class Install(object):
- Already_installed = pywbem.Uint32(0)
- Successful_installation = pywbem.Uint32(1)
- Failed = pywbem.Uint32(2)
-
diff --git a/src/software/openlmi/software/core/SoftwarePackageChecks.py b/src/software/openlmi/software/core/SoftwarePackageChecks.py
deleted file mode 100644
index 6d39294..0000000
--- a/src/software/openlmi/software/core/SoftwarePackageChecks.py
+++ /dev/null
@@ -1,32 +0,0 @@
-# -*- encoding: utf-8 -*-
-# Software Management Providers
-#
-# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-#
-# Authors: Michal Minar <miminar@redhat.com>
-#
-
-"""
-Just a common functionality related to LMI_SoftwarePackageChecks provider.
-"""
-
-import pywbem
-
-class Values(object):
- class Phase(object):
- In_State = pywbem.Uint16(0)
- Next_State = pywbem.Uint16(1)
diff --git a/src/software/openlmi/software/core/SystemSoftwareCollection.py b/src/software/openlmi/software/core/SystemSoftwareCollection.py
new file mode 100644
index 0000000..b7a320b
--- /dev/null
+++ b/src/software/openlmi/software/core/SystemSoftwareCollection.py
@@ -0,0 +1,63 @@
+# Software Management Providers
+#
+# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Common utilities concerning SystemSoftwareCollection provider.
+"""
+
+import pywbem
+
+def get_path():
+ """@return instance name with prefilled properties"""
+ op = pywbem.CIMInstanceName(
+ classname="LMI_SystemSoftwareCollection",
+ namespace="root/cimv2")
+ op['InstanceID'] = "LMI:SystemSoftwareCollection"
+ return op
+
+def check_path_property(env, op, prop_name):
+ """
+ Checks, whether prop_name property of op object path is correct.
+ If not, an exception will be risen.
+ """
+ if not prop_name in op:
+ raise pywbem.CIMError(pywbem.CIM_ERR_INVALID_PARAMETER,
+ "Missing %s key property!" % prop_name)
+ collection = op[prop_name]
+ if not isinstance(collection, pywbem.CIMInstanceName):
+ raise pywbem.CIMError(pywbem.CIM_ERR_INVALID_PARAMETER,
+ "\"%s\" must be a CIMInstanceName" % prop_name)
+ our_collection = get_path()
+ ch = env.get_cimom_handle()
+ if collection.namespace != our_collection.namespace:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ 'Namespace of "%s" does not match "%s"' % (
+ prop_name, our_collection.namespace))
+ if not ch.is_subclass(our_collection.namespace,
+ sub=collection.classname,
+ super=our_collection.classname):
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "Class of \"%s\" must be a sublass of %s" % (
+ prop_name, our_collection.classname))
+ if not 'InstanceID' in collection:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "\"%s\" is missing InstanceID key property", prop_name)
+ if collection['InstanceID'] != our_collection['InstanceID']:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "InstanceID of \"%s\" does not match \"%s\"" %
+ prop_name, our_collection['InstanceID'])
+ return True
diff --git a/src/software/openlmi/software/core/__init__.py b/src/software/openlmi/software/core/__init__.py
index 2ebe827..61158e1 100644
--- a/src/software/openlmi/software/core/__init__.py
+++ b/src/software/openlmi/software/core/__init__.py
@@ -18,3 +18,9 @@
#
# Authors: Michal Minar <miminar@redhat.com>
#
+
+"""
+Core functionality of particular providers.
+Each provider having functionality useful to others has a submodule
+in this subpackage with the same name except for LMI_ prefix.
+"""
diff --git a/src/software/openlmi/software/util/singletonmixin.py b/src/software/openlmi/software/util/singletonmixin.py
index 8051695..7b26b8b 100644
--- a/src/software/openlmi/software/util/singletonmixin.py
+++ b/src/software/openlmi/software/util/singletonmixin.py
@@ -1,8 +1,9 @@
+#pylint: disable-all
"""
A Python Singleton mixin class that makes use of some of the ideas
found at http://c2.com/cgi/wiki?PythonSingleton. Just inherit
from it and you have a singleton. No code is required in
-subclasses to create singleton behavior -- inheritance from
+subclasses to create singleton behavior -- inheritance from
Singleton is all that is needed.
Singleton creation is threadsafe.
@@ -11,8 +12,8 @@ USAGE:
Just inherit from Singleton. If you need a constructor, include
an __init__() method in your class as you usually would. However,
-if your class is S, you instantiate the singleton using S.getInstance()
-instead of S(). Repeated calls to S.getInstance() return the
+if your class is S, you instantiate the singleton using S.get_instance()
+instead of S(). Repeated calls to S.get_instance() return the
originally-created instance.
For example:
@@ -21,8 +22,8 @@ class S(Singleton):
def __init__(self, a, b=1):
pass
-
-S1 = S.getInstance(1, b=3)
+
+S1 = S.get_instance(1, b=3)
Most of the time, that's all you need to know. However, there are some
@@ -30,46 +31,47 @@ other useful behaviors. Read on for a full description:
1) Getting the singleton:
- S.getInstance()
-
-returns the instance of S. If none exists, it is created.
+ S.get_instance()
+
+returns the instance of S. If none exists, it is created.
2) The usual idiom to construct an instance by calling the class, i.e.
S()
-
-is disabled for the sake of clarity.
-For one thing, the S() syntax means instantiation, but getInstance()
+is disabled for the sake of clarity.
+
+For one thing, the S() syntax means instantiation, but get_instance()
usually does not cause instantiation. So the S() syntax would
be misleading.
-Because of that, if S() were allowed, a programmer who didn't
+Because of that, if S() were allowed, a programmer who didn't
happen to notice the inheritance from Singleton (or who
wasn't fully aware of what a Singleton pattern
-does) might think he was creating a new instance,
+does) might think he was creating a new instance,
which could lead to very unexpected behavior.
So, overall, it is felt that it is better to make things clearer
by requiring the call of a class method that is defined in
-Singleton. An attempt to instantiate via S() will result
+Singleton. An attempt to instantiate via S() will result
in a SingletonException being raised.
3) Use __S.__init__() for instantiation processing,
-since S.getInstance() runs S.__init__(), passing it the args it has received.
+since S.get_instance() runs S.__init__(), passing it the args it has received.
-If no data needs to be passed in at instantiation time, you don't need S.__init__().
+If no data needs to be passed in at instantiation time,
+you don't need S.__init__().
4) If S.__init__(.) requires parameters, include them ONLY in the
-first call to S.getInstance(). If subsequent calls have arguments,
+first call to S.get_instance(). If subsequent calls have arguments,
a SingletonException is raised by default.
If you find it more convenient for subsequent calls to be allowed to
-have arguments, but for those argumentsto be ignored, just include
+have arguments, but for those argumentsto be ignored, just include
'ignoreSubsequent = True' in your class definition, i.e.:
class S(Singleton):
-
+
ignoreSubsequent = True
def __init__(self, a, b=1):
@@ -77,345 +79,392 @@ have arguments, but for those argumentsto be ignored, just include
5) For testing, it is sometimes convenient for all existing singleton
instances to be forgotten, so that new instantiations can occur. For that
-reason, a forgetAllSingletons() function is included. Just call
+reason, a _forget_all_singletons() function is included. Just call
+
+ _forget_all_singletons()
- forgetAllSingletons()
-
and it is as if no earlier instantiations have occurred.
-6) As an implementation detail, classes that inherit
+6) As an implementation detail, classes that inherit
from Singleton may not have their own __new__
-methods. To make sure this requirement is followed,
+methods. To make sure this requirement is followed,
an exception is raised if a Singleton subclass includ
es __new__. This happens at subclass instantiation
time (by means of the MetaSingleton metaclass.
-By Gary Robinson, grobinson@flyfi.com. No rights reserved --
+By Gary Robinson, grobinson@flyfi.com. No rights reserved --
placed in the public domain -- which is only reasonable considering
how much it owes to other people's code and ideas which are in the
-public domain. The idea of using a metaclass came from
-a comment on Gary's blog (see
-http://www.garyrobinson.net/2004/03/python_singleto.html#comments).
+public domain. The idea of using a metaclass came from
+a comment on Gary's blog (see
+http://www.garyrobinson.net/2004/03/python_singleto.html#comments).
Other improvements came from comments and email from other
people who saw it online. (See the blog post and comments
for further credits.)
Not guaranteed to be fit for any particular purpose. Use at your
-own risk.
+own risk.
"""
import threading
class SingletonException(Exception):
+ """
+ Base exception related to singleton handling.
+ """
pass
-_stSingletons = set()
-_lockForSingletons = threading.RLock()
-_lockForSingletonCreation = threading.RLock() # Ensure only one instance of each Singleton
- # class is created. This is not bound to the
- # individual Singleton class since we need to
- # ensure that there is only one mutex for each
- # Singleton class, which would require having
- # a lock when setting up the Singleton class,
- # which is what this is anyway. So, when any
- # Singleton is created, we lock this lock and
- # then we don't need to lock it again for that
- # class.
-
-def _createSingletonInstance(cls, lstArgs, dctKwArgs):
- _lockForSingletonCreation.acquire()
+_ST_SINGLETONS = set()
+_LOCK_FOR_SINGLETONS = threading.RLock()
+# Ensure only one instance of each Singleton class is created. This is not
+# bound to the _LOCK_FOR_SINGLETON_CREATION = threading.RLock() individual
+# Singleton class since we need to ensure that there is only one mutex for each
+# Singleton class, which would require having a lock when setting up the
+# Singleton class, which is what this is anyway. So, when any Singleton is
+# created, we lock this lock and then we don't need to lock it again for that
+# class.
+_LOCK_FOR_SINGLETON_CREATION = threading.RLock()
+
+def _create_singleton_instance(cls, lst_args, dct_kw_args):
+ """
+ Creates singleton instance and stores its class in set.
+ """
+ _LOCK_FOR_SINGLETON_CREATION.acquire()
try:
- if cls._isInstantiated(): # some other thread got here first
- return
-
+ if cls._is_instantiated(): # some other thread got here first
+ return
+
instance = cls.__new__(cls)
try:
- instance.__init__(*lstArgs, **dctKwArgs)
- except TypeError, e:
- if e.message.find('__init__() takes') != -1:
- raise SingletonException, 'If the singleton requires __init__ args, supply them on first call to getInstance().'
+ instance.__init__(*lst_args, **dct_kw_args)
+ except TypeError, exc:
+ if '__init__() takes' in exc.message:
+ raise SingletonException, (
+ 'If the singleton requires __init__ args,'
+ ' supply them on first call to get_instance().')
else:
raise
- cls.cInstance = instance
- _addSingleton(cls)
+ cls.c_instance = instance
+ _add_singleton(cls)
finally:
- _lockForSingletonCreation.release()
+ _LOCK_FOR_SINGLETON_CREATION.release()
-def _addSingleton(cls):
- _lockForSingletons.acquire()
+def _add_singleton(cls):
+ """
+ Adds class to singleton set.
+ """
+ _LOCK_FOR_SINGLETONS.acquire()
try:
- assert cls not in _stSingletons
- _stSingletons.add(cls)
+ assert cls not in _ST_SINGLETONS
+ _ST_SINGLETONS.add(cls)
finally:
- _lockForSingletons.release()
+ _LOCK_FOR_SINGLETONS.release()
-def _removeSingleton(cls):
- _lockForSingletons.acquire()
+def _remove_singleton(cls):
+ """
+ Removes class from singleton set.
+ """
+ _LOCK_FOR_SINGLETONS.acquire()
try:
- if cls in _stSingletons:
- _stSingletons.remove(cls)
+ if cls in _ST_SINGLETONS:
+ _ST_SINGLETONS.remove(cls)
finally:
- _lockForSingletons.release()
-
-def forgetAllSingletons():
- '''This is useful in tests, since it is hard to know which singletons need to be cleared to make a test work.'''
- _lockForSingletons.acquire()
+ _LOCK_FOR_SINGLETONS.release()
+
+def _forget_all_singletons():
+ '''
+ This is useful in tests, since it is hard to know which singletons need
+ to be cleared to make a test work.
+ '''
+ _LOCK_FOR_SINGLETONS.acquire()
try:
- for cls in _stSingletons.copy():
- cls._forgetClassInstanceReferenceForTesting()
+ for cls in _ST_SINGLETONS.copy():
+ cls._forget_class_instance_reference_for_testing()
# Might have created some Singletons in the process of tearing down.
# Try one more time - there should be a limit to this.
- iNumSingletons = len(_stSingletons)
- if len(_stSingletons) > 0:
- for cls in _stSingletons.copy():
- cls._forgetClassInstanceReferenceForTesting()
- iNumSingletons -= 1
- assert iNumSingletons == len(_stSingletons), 'Added a singleton while destroying ' + str(cls)
- assert len(_stSingletons) == 0, _stSingletons
+ i_num_singletons = len(_ST_SINGLETONS)
+ if len(_ST_SINGLETONS) > 0:
+ for cls in _ST_SINGLETONS.copy():
+ cls._forget_class_instance_reference_for_testing()
+ i_num_singletons -= 1
+ assert i_num_singletons == len(_ST_SINGLETONS), \
+ 'Added a singleton while destroying ' + str(cls)
+ assert len(_ST_SINGLETONS) == 0, _ST_SINGLETONS
finally:
- _lockForSingletons.release()
+ _LOCK_FOR_SINGLETONS.release()
class MetaSingleton(type):
- def __new__(metaclass, strName, tupBases, dct):
+ """
+ Metaclass for Singleton base class.
+ """
+ def __new__(mcs, str_name, tup_bases, dct):
if dct.has_key('__new__'):
raise SingletonException, 'Can not override __new__ in a Singleton'
- return super(MetaSingleton, metaclass).__new__(metaclass, strName, tupBases, dct)
-
- def __call__(cls, *lstArgs, **dictArgs):
- raise SingletonException, 'Singletons may only be instantiated through getInstance()'
-
+ return super(MetaSingleton, mcs).__new__(
+ mcs, str_name, tup_bases, dct)
+
+ def __call__(cls, *lst_args, **dictArgs):
+ raise SingletonException, \
+ 'Singletons may only be instantiated through get_instance()'
+
class Singleton(object):
+ """
+ Base class for all singletons.
+ """
__metaclass__ = MetaSingleton
-
- def getInstance(cls, *lstArgs, **dctKwArgs):
+
+ def get_instance(cls, *lst_args, **dct_kw_args):
"""
Call this to instantiate an instance or retrieve the existing instance.
If the singleton requires args to be instantiated, include them the first
- time you call getInstance.
+ time you call get_instance.
"""
- if cls._isInstantiated():
- if (lstArgs or dctKwArgs) and not hasattr(cls, 'ignoreSubsequent'):
- raise SingletonException, 'Singleton already instantiated, but getInstance() called with args.'
+ if cls._is_instantiated():
+ if ( (lst_args or dct_kw_args)
+ and not hasattr(cls, 'ignoreSubsequent')):
+ raise SingletonException, (
+ 'Singleton already instantiated, but get_instance()'
+ ' called with args.')
else:
- _createSingletonInstance(cls, lstArgs, dctKwArgs)
-
- return cls.cInstance
- getInstance = classmethod(getInstance)
-
- def _isInstantiated(cls):
- # Don't use hasattr(cls, 'cInstance'), because that screws things up if there is a singleton that
- # extends another singleton. hasattr looks in the base class if it doesn't find in subclass.
- return 'cInstance' in cls.__dict__
- _isInstantiated = classmethod(_isInstantiated)
+ _create_singleton_instance(cls, lst_args, dct_kw_args)
+
+ return cls.c_instance #pylint: disable=E1101
+ get_instance = classmethod(get_instance)
+
+ def _is_instantiated(cls):
+ """
+ Don't use hasattr(cls, 'c_instance'), because that screws things
+ up if there is a singleton that extends another singleton.
+ hasattr looks in the base class if it doesn't find in subclass.
+ """
+ return 'c_instance' in cls.__dict__
+ _is_instantiated = classmethod(_is_instantiated)
# This can be handy for public use also
- isInstantiated = _isInstantiated
+ isInstantiated = _is_instantiated
- def _forgetClassInstanceReferenceForTesting(cls):
+ def _forget_class_instance_reference_for_testing(cls):
"""
- This is designed for convenience in testing -- sometimes you
+ This is designed for convenience in testing -- sometimes you
want to get rid of a singleton during test code to see what
- happens when you call getInstance() under a new situation.
-
+ happens when you call get_instance() under a new situation.
+
To really delete the object, all external references to it
also need to be deleted.
"""
try:
- if hasattr(cls.cInstance, '_prepareToForgetSingleton'):
+ if hasattr(cls.c_instance, '_prepare_to_forget_singleton'):
# tell instance to release anything it might be holding onto.
- cls.cInstance._prepareToForgetSingleton()
- del cls.cInstance
- _removeSingleton(cls)
+ cls.c_instance._prepare_to_forget_singleton()
+ del cls.c_instance
+ _remove_singleton(cls)
except AttributeError:
- # run up the chain of base classes until we find the one that has the instance
- # and then delete it there
- for baseClass in cls.__bases__:
- if issubclass(baseClass, Singleton):
- baseClass._forgetClassInstanceReferenceForTesting()
- _forgetClassInstanceReferenceForTesting = classmethod(_forgetClassInstanceReferenceForTesting)
-
-
-if __name__ == '__main__':
+ # run up the chain of base classes until we find the one that has
+ # the instance and then delete it there
+ for base_class in cls.__bases__:
+ if issubclass(base_class, Singleton):
+ base_class._forget_class_instance_reference_for_testing()
+ _forget_class_instance_reference_for_testing = classmethod(
+ _forget_class_instance_reference_for_testing)
+
+
+if __name__ == '__main__':
import unittest
import time
-
- class singletonmixin_Public_TestCase(unittest.TestCase):
- def testReturnsSameObject(self):
+
+ class SingletonMixinPublicTestCase(unittest.TestCase):
+ """
+ TestCase for singleton class.
+ """
+ def testReturnsSameObject(self): #pylint: disable=C0103
"""
- Demonstrates normal use -- just call getInstance and it returns a singleton instance
+ Demonstrates normal use -- just call get_instance and it returns a singleton instance
"""
-
- class A(Singleton):
+
+ class Foo(Singleton):
+ """Singleton child class."""
def __init__(self):
- super(A, self).__init__()
-
- a1 = A.getInstance()
- a2 = A.getInstance()
+ super(Foo, self).__init__()
+
+ a1 = Foo.get_instance()
+ a2 = Foo.get_instance()
self.assertEquals(id(a1), id(a2))
-
- def testInstantiateWithMultiArgConstructor(self):
+
+ def testInstantiateWithMultiArgConstructor(self):#pylint: disable=C0103
"""
If the singleton needs args to construct, include them in the first
call to get instances.
"""
-
- class B(Singleton):
-
+
+ class Bar(Singleton):
+ """Singleton child class."""
+
def __init__(self, arg1, arg2):
- super(B, self).__init__()
+ super(Bar, self).__init__()
self.arg1 = arg1
self.arg2 = arg2
-
- b1 = B.getInstance('arg1 value', 'arg2 value')
- b2 = B.getInstance()
+
+ b1 = Bar.get_instance('arg1 value', 'arg2 value')
+ b2 = Bar.get_instance()
self.assertEquals(b1.arg1, 'arg1 value')
self.assertEquals(b1.arg2, 'arg2 value')
self.assertEquals(id(b1), id(b2))
-
+
def testInstantiateWithKeywordArg(self):
-
- class B(Singleton):
-
+ """
+ Test instantiation with keyword arguments.
+ """
+
+ class Baz(Singleton):
+ """Singleton child class."""
def __init__(self, arg1=5):
- super(B, self).__init__()
+ super(Baz, self).__init__()
self.arg1 = arg1
-
- b1 = B.getInstance('arg1 value')
- b2 = B.getInstance()
+
+ b1 = Baz.get_instance('arg1 value')
+ b2 = Baz.get_instance()
self.assertEquals(b1.arg1, 'arg1 value')
self.assertEquals(id(b1), id(b2))
-
+
def testTryToInstantiateWithoutNeededArgs(self):
-
- class B(Singleton):
-
+ """
+ This tests, improper instantiation.
+ """
+
+ class Foo(Singleton):
+ """Singleton child class."""
def __init__(self, arg1, arg2):
- super(B, self).__init__()
+ super(Foo, self).__init__()
self.arg1 = arg1
self.arg2 = arg2
-
- self.assertRaises(SingletonException, B.getInstance)
-
+
+ self.assertRaises(SingletonException, Foo.get_instance)
+
def testPassTypeErrorIfAllArgsThere(self):
"""
- Make sure the test for capturing missing args doesn't interfere with a normal TypeError.
+ Make sure the test for capturing missing args doesn't interfere
+ with a normal TypeError.
"""
- class B(Singleton):
-
+ class Bar(Singleton):
+ """Singleton child class."""
def __init__(self, arg1, arg2):
- super(B, self).__init__()
+ super(Bar, self).__init__()
self.arg1 = arg1
self.arg2 = arg2
raise TypeError, 'some type error'
-
- self.assertRaises(TypeError, B.getInstance, 1, 2)
-
+
+ self.assertRaises(TypeError, Bar.get_instance, 1, 2)
+
def testTryToInstantiateWithoutGetInstance(self):
"""
Demonstrates that singletons can ONLY be instantiated through
- getInstance, as long as they call Singleton.__init__ during construction.
-
- If this check is not required, you don't need to call Singleton.__init__().
+ get_instance, as long as they call Singleton.__init__ during
+ construction.
+
+ If this check is not required, you don't need to call
+ Singleton.__init__().
"""
-
- class A(Singleton):
+
+ class A(Singleton):
def __init__(self):
super(A, self).__init__()
-
+
self.assertRaises(SingletonException, A)
-
+
def testDontAllowNew(self):
-
+
def instantiatedAnIllegalClass():
- class A(Singleton):
+ class A(Singleton):
def __init__(self):
super(A, self).__init__()
-
- def __new__(metaclass, strName, tupBases, dct):
- return super(MetaSingleton, metaclass).__new__(metaclass, strName, tupBases, dct)
-
+
+ def __new__(metaclass, str_name, tup_bases, dct):
+ return super(MetaSingleton, metaclass).__new__(
+ metaclass, str_name, tup_bases, dct)
+
self.assertRaises(SingletonException, instantiatedAnIllegalClass)
-
-
+
+
def testDontAllowArgsAfterConstruction(self):
- class B(Singleton):
-
+ class B(Singleton):
+
def __init__(self, arg1, arg2):
super(B, self).__init__()
self.arg1 = arg1
self.arg2 = arg2
-
- B.getInstance('arg1 value', 'arg2 value')
+
+ B.get_instance('arg1 value', 'arg2 value')
self.assertRaises(SingletonException, B, 'arg1 value', 'arg2 value')
-
+
def test_forgetClassInstanceReferenceForTesting(self):
- class A(Singleton):
+ class A(Singleton):
def __init__(self):
super(A, self).__init__()
- class B(A):
+ class B(A):
def __init__(self):
super(B, self).__init__()
-
- # check that changing the class after forgetting the instance produces
- # an instance of the new class
- a = A.getInstance()
+
+ # check that changing the class after forgetting the instance
+ # produces an instance of the new class
+ a = A.get_instance()
assert a.__class__.__name__ == 'A'
- A._forgetClassInstanceReferenceForTesting()
- b = B.getInstance()
+ A._forget_class_instance_reference_for_testing()
+ b = B.get_instance()
assert b.__class__.__name__ == 'B'
-
- # check that invoking the 'forget' on a subclass still deletes the instance
- B._forgetClassInstanceReferenceForTesting()
- a = A.getInstance()
- B._forgetClassInstanceReferenceForTesting()
- b = B.getInstance()
+
+ # check that invoking the 'forget' on a subclass still deletes
+ # the instance
+ B._forget_class_instance_reference_for_testing()
+ a = A.get_instance()
+ B._forget_class_instance_reference_for_testing()
+ b = B.get_instance()
assert b.__class__.__name__ == 'B'
-
+
def test_forgetAllSingletons(self):
# Should work if there are no singletons
- forgetAllSingletons()
-
+ _forget_all_singletons()
+
class A(Singleton):
ciInitCount = 0
def __init__(self):
super(A, self).__init__()
A.ciInitCount += 1
-
- A.getInstance()
+
+ A.get_instance()
self.assertEqual(A.ciInitCount, 1)
-
- A.getInstance()
+
+ A.get_instance()
self.assertEqual(A.ciInitCount, 1)
-
- forgetAllSingletons()
- A.getInstance()
+
+ _forget_all_singletons()
+ A.get_instance()
self.assertEqual(A.ciInitCount, 2)
-
+
def test_threadedCreation(self):
- # Check that only one Singleton is created even if multiple
- # threads try at the same time. If fails, would see assert in _addSingleton
+ # Check that only one Singleton is created even if multiple threads
+ # try at the same time. If fails, would see assert in _add_singleton
class Test_Singleton(Singleton):
def __init__(self):
super(Test_Singleton, self).__init__()
-
+
class Test_SingletonThread(threading.Thread):
def __init__(self, fTargetTime):
super(Test_SingletonThread, self).__init__()
self._fTargetTime = fTargetTime
self._eException = None
-
+
def run(self):
try:
fSleepTime = self._fTargetTime - time.time()
if fSleepTime > 0:
time.sleep(fSleepTime)
- Test_Singleton.getInstance()
- except Exception, e:
- self._eException = e
-
+ Test_Singleton.get_instance()
+ except Exception, exc:
+ self._eException = exc
+
fTargetTime = time.time() + 0.1
lstThreads = []
for _ in xrange(100):
@@ -429,80 +478,83 @@ if __name__ == '__main__':
eException = t._eException
if eException:
raise eException
-
+
def testNoInit(self):
"""
Demonstrates use with a class not defining __init__
"""
-
- class A(Singleton):
+
+ class A(Singleton):
pass
-
+
#INTENTIONALLY UNDEFINED:
#def __init__(self):
# super(A, self).__init__()
-
- A.getInstance() #Make sure no exception is raised
-
+
+ A.get_instance() #Make sure no exception is raised
+
def testMultipleGetInstancesWithArgs(self):
-
+
class A(Singleton):
-
+
ignoreSubsequent = True
-
+
def __init__(self, a, b=1):
pass
-
- a1 = A.getInstance(1)
- a2 = A.getInstance(2) # ignores the second call because of ignoreSubsequent
-
+
+ a1 = A.get_instance(1)
+ # ignores the second call because of ignoreSubsequent
+ a2 = A.get_instance(2)
+
class B(Singleton):
-
+
def __init__(self, a, b=1):
pass
-
- b1 = B.getInstance(1)
- self.assertRaises(SingletonException, B.getInstance, 2) # No ignoreSubsequent included
-
+
+ b1 = B.get_instance(1)
+ # No ignoreSubsequent included
+ self.assertRaises(SingletonException, B.get_instance, 2)
+
class C(Singleton):
-
+
def __init__(self, a=1):
pass
-
- c1 = C.getInstance(a=1)
- self.assertRaises(SingletonException, C.getInstance, a=2) # No ignoreSubsequent included
-
+
+ c1 = C.get_instance(a=1)
+ # No ignoreSubsequent included
+ self.assertRaises(SingletonException, C.get_instance, a=2)
+
def testInheritance(self):
- """
+ """
It's sometimes said that you can't subclass a singleton (see, for instance,
http://steve.yegge.googlepages.com/singleton-considered-stupid point e). This
test shows that at least rudimentary subclassing works fine for us.
"""
-
+
class A(Singleton):
-
- def setX(self, x):
+
+ def set_x(self, x):
self.x = x
-
+
def setZ(self, z):
raise NotImplementedError
-
+
class B(A):
-
- def setX(self, x):
+
+ def set_x(self, x):
self.x = -x
-
- def setY(self, y):
+
+ def set_y(self, y):
self.y = y
-
- a = A.getInstance()
- a.setX(5)
- b = B.getInstance()
- b.setX(5)
- b.setY(50)
+
+ a = A.get_instance()
+ a.set_x(5)
+ b = B.get_instance()
+ b.set_x(5)
+ b.set_y(50)
self.assertEqual((a.x, b.x, b.y), (5, -5, 50))
- self.assertRaises(AttributeError, eval, 'a.setY', {}, locals())
+ self.assertRaises(AttributeError, eval, 'a.set_y', {}, locals())
self.assertRaises(NotImplementedError, b.setZ, 500)
unittest.main()
-
+
diff --git a/src/software/openlmi/software/yumdb/__init__.py b/src/software/openlmi/software/yumdb/__init__.py
index 0dc9050..dc77689 100644
--- a/src/software/openlmi/software/yumdb/__init__.py
+++ b/src/software/openlmi/software/yumdb/__init__.py
@@ -235,21 +235,23 @@ class YumDB(singletonmixin.Singleton):
"""
Shut down the YumWorker process.
"""
- cmpi_logging.logger.info('YumDB: cleanup called')
- if self._process:
+ if self._process is not None:
cmpi_logging.logger.info('YumDB: terminating YumWorker')
self._process.uplink.put(None) # terminating command
self._process.uplink.join()
self._process.join()
cmpi_logging.logger.info('YumDB: YumWorker terminated')
self._process = None
+ else:
+ cmpi_logging.logger.warn("YunDB: clean_up called, when process"
+ " not initialized!")
@cmpi_logging.trace_method
def get_package_list(self, kind,
allow_duplicates=False,
sort=False):
"""
- @param kind is one of: {"installed", "available", "all"}
+ @param kind is one of: jobs.YumGetPackageList.SUPPORTED_KINDS
@param allow_duplicates says, whether to list all found versions
of single package
@return [pkg1, pkg2, ...], pkgi is instance of yumdb.PackageInfo
diff --git a/src/software/openlmi/software/yumdb/jobs.py b/src/software/openlmi/software/yumdb/jobs.py
index 1fdfbe3..f280a11 100644
--- a/src/software/openlmi/software/yumdb/jobs.py
+++ b/src/software/openlmi/software/yumdb/jobs.py
@@ -100,7 +100,18 @@ class YumGetPackageList(YumJob): #pylint: disable=R0903
"""
Job requesing a list of packages.
Arguments:
- kind - supported values are in SUPPORTED_KINDS tuple
+ kind - supported values are in SUPPORTED_KINDS tuple
+ * installed lists all installed packages; more packages with
+ the same name can be installed varying in their architecture
+ * avail_notinst lists all available, not installed packages;
+ allow_duplicates must be True to include older packages (but still
+ available)
+ * avail_reinst lists all installed packages, that are available;
+ package can be installed, but not available anymore due to updates
+ of repository, where only the newest packages are kept
+ * available lists a union of avail_notinst and avail_reinst
+ * all lists union of installed and avail_notinst
+
allow_duplicates - whether multiple packages can be present
in result for single (name, arch) of package differing
in their version
@@ -109,7 +120,8 @@ class YumGetPackageList(YumJob): #pylint: disable=R0903
"""
__slots__ = ('kind', 'allow_duplicates', 'sort')
- SUPPORTED_KINDS = ('installed', 'available', 'all')
+ SUPPORTED_KINDS = ( 'installed', 'available', 'avail_reinst'
+ , 'avail_notinst', 'all')
def __init__(self, kind, allow_duplicates, sort=False):
YumJob.__init__(self)
diff --git a/src/software/openlmi/software/yumdb/process.py b/src/software/openlmi/software/yumdb/process.py
index c247de7..cd70407 100644
--- a/src/software/openlmi/software/yumdb/process.py
+++ b/src/software/openlmi/software/yumdb/process.py
@@ -339,7 +339,7 @@ class YumWorker(Process):
_logger().warn("lookup of package %s with id=%d failed, trying"
" to query database", pkg, pkg.pkgid)
result = self._handle_filter_packages(
- 'installed' if pkg.installed else 'available',
+ 'installed' if pkg.installed else 'avail_reinst',
allow_duplicates=False,
sort=False,
transform=False,
@@ -428,11 +428,23 @@ class YumWorker(Process):
or original ones
@return [pkg1, pkg2, ...]
"""
- pkglist = self._yum_base.doPackageLists(kind, showdups=allow_duplicates)
- if kind == 'all':
- result = pkglist.available + pkglist.installed
+ if kind == 'avail_notinst':
+ what = 'available'
+ elif kind == 'available':
+ what = 'all'
+ elif kind == 'avail_reinst':
+ what = 'all'
else:
- result = getattr(pkglist, kind)
+ what = kind
+ pkglist = self._yum_base.doPackageLists(what, showdups=allow_duplicates)
+ if kind == 'all':
+ result = pkglist.available + pkglist.installed
+ elif kind == 'available':
+ result = pkglist.available + pkglist.reinstall_available
+ elif kind == 'avail_reinst':
+ result = pkglist.reinstall_available
+ else: # get installed or available
+ result = getattr(pkglist, what)
if sort is True:
result.sort()
_logger().debug("returning %s packages", len(result))
diff --git a/src/software/test/common.py b/src/software/test/base.py
index d91f236..4e56f46 100644
--- a/src/software/test/common.py
+++ b/src/software/test/base.py
@@ -21,61 +21,15 @@
Common utilities and base class for all software tests.
"""
+import itertools
import os
import pywbem
-import re
import tempfile
import unittest
-from subprocess import call, check_output
+from subprocess import check_output
import rpmcache
-RE_NEVRA = re.compile(r'^(?P<name>.+)-(?P<evra>((?P<epoch>\d+):)?(?P<ver>[^-]+)'
- r'-(?P<rel>.+)\.(?P<arch>[^.]+))$')
-
-def remove_pkg(pkg, *args):
- """
- Remove package with rpm command.
- @param pkg is either instance of Package or package name
- @param args is a list of parameters for rpm command
- """
- if isinstance(pkg, rpmcache.Package):
- pkg = pkg.name
- call(["rpm", "--quiet"] + list(args) + ["-e", pkg])
-
-def install_pkg(pkg, newer=True, repolist=[]):
- """
- Install a specific package.
- @param pkg is either package name or instance of Package
- In latter case, a specific version is installed.
- @param repolist is a list of repositories, that should be
- used for downloading, if using yum
- when empty, all enabled repositories are used
- """
- if isinstance(pkg, rpmcache.Package):
- try:
- rpm_name = rpmcache.get_rpm_name(pkg, newer=newer)
- call(["rpm", "--quiet", "-i", rpm_name])
- return
- except rpmcache.MissingRPM:
- pass
- pkg = pkg.name
- rpmcache.run_yum('-q', '-y', 'install', pkg, repolist=repolist)
-
-def is_installed(pkg, newer=True):
- """
- Check, whether package is installed.
- Accepts the same parameters as install_pkg.
- @see install_pkg
- """
- return rpmcache.is_installed(pkg, newer)
-
-def verify_pkg(name):
- """
- @return output of command rpm, with verification output for package
- """
- return call(["rpm", "--quiet", "-Va", name]) == 0
-
def mark_dangerous(method):
"""
Decorator for methods of unittest.TestCase subclasses, that
@@ -86,21 +40,17 @@ def mark_dangerous(method):
else:
return unittest.skip("This test is marked as dangerous.")(method)
-def is_config_file(pkg, file_path):
- """
- @return True, if file_path is a configuration file of package pkg.
- """
- cmd = ['rpm', '-qc', pkg.name]
- out = check_output(cmd)
- return file_path in set(out.splitlines()) #pylint: disable=E1103
-
-def is_doc_file(pkg, file_path):
+def mark_tedious(method):
"""
- @return True, if file_path is a documentation file of package pkg.
+ Decorator for methods of unittest.TestCase subclasses, that
+ skips tedious tests. Those running for very long time and usually
+ need a lot of memory. Environment variable "LMI_RUN_TEDIOUS" can
+ allow them
"""
- cmd = ['rpm', '-qd', pkg.name]
- out = check_output(cmd)
- return file_path in set(out.splitlines()) #pylint: disable=E1103
+ if os.environ.get('LMI_RUN_TEDIOUS', '0') == '1':
+ return method
+ else:
+ return unittest.skip("This test is marked as tedious.")(method)
def get_pkg_files(pkg):
"""
@@ -124,9 +74,9 @@ def get_pkg_files(pkg):
elif len(symlinks) == 0 and os.path.islink(fpath):
symlinks.add(fpath)
elif not os.path.islink(fpath) and os.path.isfile(fpath):
- if len(configs) == 0 and is_config_file(pkg, fpath):
+ if len(configs) == 0 and rpmcache.has_pkg_config_file(pkg, fpath):
configs.add(fpath)
- elif len(docs) == 0 and is_doc_file(pkg, fpath):
+ elif len(docs) == 0 and rpmcache.has_pkg_doc_file(pkg, fpath):
docs.add(fpath)
elif len(files) == 0:
files.add(fpath)
@@ -142,6 +92,27 @@ class SoftwareBaseTestCase(unittest.TestCase): #pylint: disable=R0904
CLASS_NAME = "Define in subclass"
+ # will be filled when first needed
+ # it's a dictionary with items (pkg_name, [file_path1, ...])
+ PKGDB_FILES = None
+
+ @classmethod
+ def get_pkgdb_files(cls):
+ """
+ @return dictionary { pkg_name: ["file_path1, ...] }
+ """
+ if cls.PKGDB_FILES is not None:
+ return cls.PKGDB_FILES
+ SoftwareBaseTestCase.PKGDB_FILES = res = dict(
+ (pkg.name, get_pkg_files(pkg)) for pkg in itertools.chain(
+ cls.safe_pkgs, cls.dangerous_pkgs))
+ return res
+
+ @classmethod
+ def needs_pkgdb_files(cls):
+ """subclass may override this, if it needs PKGDB_FILES database"""
+ return False
+
def __init__(self, *args, **kwargs):
unittest.TestCase.__init__(self, *args, **kwargs)
self.longMessage = True
@@ -152,9 +123,20 @@ class SoftwareBaseTestCase(unittest.TestCase): #pylint: disable=R0904
namespace="root/cimv2", classname=self.CLASS_NAME)
def install_pkg(self, pkg, newer=True, repolist=None):
+ """
+ Use this method instead of function in rpmcache in tests.
+ """
+ if repolist is None:
+ repolist = self.test_repos
+ return rpmcache.install_pkg(pkg, newer, repolist)
+
+ def ensure_pkg_installed(self, pkg, newer=True, repolist=None):
+ """
+ Use this method instead of function in rpmcache in tests.
+ """
if repolist is None:
repolist = self.test_repos
- return install_pkg(pkg, newer, repolist)
+ return rpmcache.ensure_pkg_installed(pkg, newer, repolist)
def assertIsSubclass(self, cls, base_cls): #pylint: disable=C0103
"""
@@ -170,19 +152,17 @@ class SoftwareBaseTestCase(unittest.TestCase): #pylint: disable=R0904
"root/cimv2", base_cls, cls))
def assertEqual(self, fst, snd, *args, **kwargs):
+ """
+ Modify assertEqual for instance names comparing only important
+ properties.
+ """
if ( isinstance(fst, pywbem.CIMInstanceName)
and isinstance(snd, pywbem.CIMInstanceName)
- and fst.classname == "LMI_SoftwarePackage"
and fst.classname == snd.classname
and fst.namespace == snd.namespace
- and fst.keys() == snd.keys()
- and all(fst[k] == snd[k] for k in ("Name", "SoftwareElementID",
- "SoftwareElementState", "Version"))
- and isinstance(fst["TargetOperatingSystem"], (int, long))
- and isinstance(snd["TargetOperatingSystem"], (int, long))):
+ and fst.keybindings == snd.keybindings):
return True
- return unittest.TestCase.assertEqual(
- self, fst, snd, *args, **kwargs)
+ unittest.TestCase.assertEqual(self, fst, snd, *args, **kwargs)
@classmethod
def setUpClass(cls):
@@ -193,11 +173,8 @@ class SoftwareBaseTestCase(unittest.TestCase): #pylint: disable=R0904
cls.conn = pywbem.WBEMConnection(cls.url, (cls.user, cls.password))
cls.run_dangerous = (
os.environ.get('LMI_RUN_DANGEROUS', '0') == '1')
- cls.test_repos = os.environ.get('LMI_SOFTWARE_TEST_REPOS', '')
- if not cls.test_repos:
- cls.test_repos = []
- else:
- cls.test_repos = cls.test_repos.split(',')
+ cls.test_repos = os.environ.get(
+ 'LMI_SOFTWARE_TEST_REPOS', '').split(',')
use_cache = os.environ.get('LMI_SOFTWARE_USE_CACHE', '0') == '1'
cls.cache_dir = None
if use_cache:
@@ -210,19 +187,23 @@ class SoftwareBaseTestCase(unittest.TestCase): #pylint: disable=R0904
os.makedirs(cls.cache_dir)
# rpm packages are expected to be in CWD
os.chdir(cls.cache_dir)
- cls.pkgdb = rpmcache.get_pkg_database(use_cache=use_cache)
- for pkg in cls.pkgdb:
- if not is_installed(pkg.name):
- install_pkg(pkg, repolist=cls.test_repos)
- cls.pkg_files = dict((pkg.name, get_pkg_files(pkg))
- for pkg in cls.pkgdb)
+ cls.safe_pkgs, cls.dangerous_pkgs = rpmcache.get_pkg_database(
+ use_cache=use_cache,
+ dangerous=cls.run_dangerous,
+ repolist=cls.test_repos,
+ cache_dir=cls.cache_dir if use_cache else None)
+ for pkg in cls.dangerous_pkgs:
+ if not rpmcache.is_pkg_installed(pkg.name):
+ rpmcache.install_pkg(pkg, repolist=cls.test_repos)
+ if cls.needs_pkgdb_files():
+ cls.pkgdb_files = cls.get_pkgdb_files()
@classmethod
def tearDownClass(cls):
if cls.run_dangerous:
- for pkg in cls.pkgdb:
- if is_installed(pkg.name):
- remove_pkg(pkg.name)
+ for pkg in cls.dangerous_pkgs:
+ if rpmcache.is_pkg_installed(pkg.name):
+ rpmcache.remove_pkg(pkg.name)
if hasattr(cls, "prev_dir"):
os.chdir(cls.prev_dir)
diff --git a/src/software/test/package.py b/src/software/test/package.py
new file mode 100644
index 0000000..997ee34
--- /dev/null
+++ b/src/software/test/package.py
@@ -0,0 +1,117 @@
+#!/usr/bin/python
+# -*- Coding:utf-8 -*-
+#
+# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details. #
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Michal Minar <miminar@redhat.com>
+"""
+Abstraction for RPM package for test purposes.
+"""
+
+import util
+
+class Package(object): #pylint: disable=R0902
+ """
+ Element of test package database. It's a container for package
+ informations. It contains two sets of versions for single package.
+ That's meant for updating tests.
+ """
+ def __init__(self, name, epoch, ver, rel, arch, repo,
+ **kwargs):
+ """
+ Arguments prefixed with 'up_' are for newer package.
+ """
+ self._name = name
+ if not epoch or epoch.lower() == "(none)":
+ epoch = "0"
+ self._epoch = epoch
+ self._ver = ver
+ self._rel = rel
+ self._arch = arch
+ self._repo = repo
+ safe = kwargs.get('safe', False)
+ self._up_epoch = epoch if safe else kwargs.get('up_epoch', epoch)
+ self._up_ver = ver if safe else kwargs.get('up_ver' , ver)
+ self._up_rel = rel if safe else kwargs.get('up_rel' , rel)
+ self._up_repo = repo if safe else kwargs.get('up_repo', repo)
+
+ def __str__(self):
+ return self.get_nevra()
+
+ @property
+ def name(self): return self._name #pylint: disable=C0111,C0321
+ @property
+ def epoch(self): return self._epoch #pylint: disable=C0111,C0321
+ @property
+ def ver(self): return self._ver #pylint: disable=C0111,C0321
+ @property
+ def rel(self): return self._rel #pylint: disable=C0111,C0321
+ @property
+ def arch(self): return self._arch #pylint: disable=C0111,C0321
+ @property
+ def repo(self): return self._repo #pylint: disable=C0111,C0321
+ @property
+ def nevra(self): #pylint: disable=C0111,C0321
+ return self.get_nevra(False)
+ @property
+ def evra(self): #pylint: disable=C0111,C0321
+ return self.get_evra(False)
+
+ @property
+ def up_epoch(self): return self._up_epoch #pylint: disable=C0111,C0321
+ @property
+ def up_ver(self): return self._up_ver #pylint: disable=C0111,C0321
+ @property
+ def up_rel(self): return self._up_rel #pylint: disable=C0111,C0321
+ @property
+ def up_repo(self): return self._up_repo #pylint: disable=C0111,C0321
+ @property
+ def up_nevra(self): #pylint: disable=C0111,C0321
+ return self.get_nevra(True)
+ @property
+ def up_evra(self): #pylint: disable=C0111,C0321
+ return self.get_evra(True)
+
+ @property
+ def is_safe(self):
+ """
+ @return True if properties prefixed with up_ matches those without
+ it. In that case a package is suited for non-dangerous tests.
+ """
+ return all( getattr(self, a) == getattr(self, 'up_' + a)
+ for a in ('epoch', 'ver', 'rel', 'repo'))
+
+ def get_nevra(self, newer=True, with_epoch='NOT_ZERO'):
+ """
+ @newer if True, evr part is made from properties prefixed with 'up_'
+ @return pkg nevra string
+ """
+ if newer:
+ attrs = ['name', 'up_epoch', 'up_ver', 'up_rel', 'arch']
+ else:
+ attrs = ['name', 'epoch', 'ver', 'rel', 'arch']
+ return util.make_nevra(*[getattr(self, '_'+a) for a in attrs],
+ with_epoch=with_epoch)
+
+ def get_evra(self, newer=True):
+ """
+ @newer if True, evr part is made from properties prefixed with 'up_'
+ @return pkg nevra string
+ """
+ attrs = [('up_' if newer else '')+a for a in ('epoch', 'ver', 'rel')]
+ attrs.append('arch')
+ return util.make_evra(*[getattr(self, '_'+a) for a in attrs])
+
diff --git a/src/software/test/rpmcache.py b/src/software/test/rpmcache.py
index 9a0e98a..c2dde24 100644
--- a/src/software/test/rpmcache.py
+++ b/src/software/test/rpmcache.py
@@ -18,49 +18,59 @@
#
# Authors: Radek Novacek <rnovacek@redhat.com>
# Authors: Michal Minar <miminar@redhat.com>
+
"""
Creation and manipulation utilities with rpm cache for software tests.
"""
+
import copy
import datetime
import os
import pickle
+import random
import re
from collections import defaultdict
from subprocess import call, check_output, CalledProcessError
+from package import Package
+import util
+
DB_BACKUP_FILE = 'lmi_software_test_cache'
RE_AVAIL_PKG = re.compile(
r'^(?P<name>[^\s]+)\.(?P<arch>[a-zA-Z0-9_]+)'
- r'\s+(?P<epoch>([0-9]+:)?)(?P<version>[a-zA-Z0-9._+-]+)'
- r'-(?P<release>[a-zA-Z0-9_.]+)\s+'
- r'(?P<repository>[a-zA-Z0-9_-]+)\s*$', re.MULTILINE)
+ r'\s+((?P<epoch>[0-9]+):)?(?P<ver>[a-zA-Z0-9._+-]+)'
+ r'-(?P<rel>[a-zA-Z0-9_.]+)\s+'
+ r'(?P<repo>[a-zA-Z0-9_-]+)\s*$', re.MULTILINE)
# this won't match the last entry, unless "package\n" is not appended
# at the end of the string
RE_PKG_DEPS = re.compile(
r'^package:\s*(?P<name>[^\s]+)\.(?P<arch>[a-zA-Z0-9_]+)'
- r'\s+(?P<epoch>([0-9]+:)?)(?P<version>[a-zA-Z0-9._+-]+)'
- r'-(?P<release>[a-zA-Z0-9_.]+)\s+(?P<dep_list>.*?)'
+ r'\s+((?P<epoch>[0-9]+):)?(?P<ver>[a-zA-Z0-9._+-]+)'
+ r'-(?P<rel>[a-zA-Z0-9_.]+)\s+(?P<dep_list>.*?)'
r'(?=^package|\Z)', re.MULTILINE | re.DOTALL)
RE_DEPS_PROVIDERS = re.compile(
r'^\s*dependency:\s*(?P<dependency>.+?)\s*'
- r'(?P<providers>(^\s+provider:.+?$)+)', re.MULTILINE | re.IGNORECASE)
+ r'(?P<providers>(^\s+unsatisfied\s+dependency$)|(^\s+provider:.+?$)+)',
+ re.MULTILINE | re.IGNORECASE)
+RE_DEPS_UNSATISFIED = re.compile(r'^\s+unsatisfied\s+dep.*$', re.IGNORECASE)
RE_PROVIDER = re.compile(
r'^\s+provider:\s*(?P<name>[^\s]+)\.(?P<arch>[a-zA-Z0-9_]+)'
- r'\s+(?P<epoch>([0-9]+:)?)(?P<version>[a-zA-Z0-9._+-]+)'
- r'-(?P<release>[a-zA-Z0-9_.]+)\s*$', re.IGNORECASE | re.MULTILINE)
+ r'\s+((?P<epoch>[0-9]+):)?(?P<ver>[a-zA-Z0-9._+-]+)'
+ r'-(?P<rel>[a-zA-Z0-9_.]+)\s*$', re.IGNORECASE | re.MULTILINE)
RE_PKG_INFO = re.compile(
r'^Name\s*:\s*(?P<name>[^\s]+).*?'
r'^(Epoch\s*:\s*(?P<epoch>[0-9]+)\s+)?'
- r'^Version\s*:\s*(?P<version>[a-zA-Z0-9._+-]+)\s+'
- r'^Release\s*:\s*(?P<release>[^\s]+)\s+.*?'
+ r'^Version\s*:\s*(?P<ver>[a-zA-Z0-9._+-]+)\s+'
+ r'^Release\s*:\s*(?P<rel>[^\s]+)\s+.*?'
r'^Size\s*:\s*(?P<size>\d+(\.\d+)?)( *(?P<units>[kMG]))?',
re.MULTILINE | re.DOTALL | re.IGNORECASE)
RE_REPO = re.compile(
r'(?:^\*?)(?P<name>[^\s/]+\b)(?!\s+id)', re.MULTILINE | re.IGNORECASE)
-# maximum number of packages, that will be selected for testing
+# Maximum number of packages, that will be selected for testing / 2
+# There are 2 sets of test packages (safe and dangerous). When running
+# in dangerous mode, the resulting number will be twice as much.
MAX_PKG_DB_SIZE = 10
# step used to iterate over package names used to check for thery dependencies
# it's a number of packages, that will be passed to yum command at once
@@ -78,104 +88,6 @@ class MissingRPM(InvalidTestCache):
InvalidTestCache.__init__(self,
"Missing package '%s' in test cache!"%pkg_name)
-def make_nevra(name, epoch, ver, rel, arch, with_epoch='NOT_ZERO'):
- """
- @param with_epoch may be one of:
- "NOT_ZERO" - include epoch only if it's not zero
- "ALWAYS" - include epoch always
- "NEVER" - do not include epoch at all
- """
- estr = ''
- if with_epoch.lower() == "always":
- estr = epoch
- elif with_epoch.lower() == "not_zero":
- if epoch != "0":
- estr = epoch
- if len(estr):
- estr += ":"
- return "%s-%s%s-%s.%s" % (name, estr, ver, rel, arch)
-
-def run_yum(*params, **kwargs):
- """
- Runs yum with params and returns its output
- It's here especially to allow pass a repolist argument, that
- specifies list of repositories, to run the command on.
- """
- cmd = ['yum'] + list(params)
- repolist = kwargs.get('repolist', [])
- if repolist:
- cmd += ['--disablerepo=*']
- cmd += ['--enablerepo='+r for r in repolist]
- try:
- return check_output(cmd)
- except Exception:
- import pdb;pdb.set_trace()
-
-class Package(object): #pylint: disable=R0902
- """
- Element of test package database. It's a container for package
- informations. It contains two sets of versions for single package.
- That's meant for updating tests.
- """
- def __init__(self, name, epoch, ver, rel, arch, repo,
- up_epoch, up_ver, up_rel, up_repo):
- """
- Arguments prefixed with 'up_' are for newer package.
- """
- self._name = name
- self._epoch = epoch
- self._ver = ver
- self._rel = rel
- self._arch = arch
- self._repo = repo
- self._up_epoch = up_epoch
- self._up_ver = up_ver
- self._up_rel = up_rel
- self._up_repo = up_repo
-
- def __str__(self):
- return self.get_nevra()
-
- @property
- def name(self): return self._name #pylint: disable=C0111,C0321
- @property
- def epoch(self): return self._epoch #pylint: disable=C0111,C0321
- @property
- def ver(self): return self._ver #pylint: disable=C0111,C0321
- @property
- def rel(self): return self._rel #pylint: disable=C0111,C0321
- @property
- def arch(self): return self._arch #pylint: disable=C0111,C0321
- @property
- def repo(self): return self._repo #pylint: disable=C0111,C0321
- @property
- def nevra(self): #pylint: disable=C0111,C0321
- return self.get_nevra(True)
-
- @property
- def up_epoch(self): return self._up_epoch #pylint: disable=C0111,C0321
- @property
- def up_ver(self): return self._up_ver #pylint: disable=C0111,C0321
- @property
- def up_rel(self): return self._up_rel #pylint: disable=C0111,C0321
- @property
- def up_repo(self): return self._up_repo #pylint: disable=C0111,C0321
- @property
- def up_nevra(self): #pylint: disable=C0111,C0321
- return self.get_nevra(True)
-
- def get_nevra(self, newer=True, with_epoch='NOT_ZERO'):
- """
- @newer if True, evr part is made from properties prefixed with 'up_'
- @return pkg nevra string
- """
- if newer:
- attrs = ['name', 'up_epoch', 'up_ver', 'up_rel', 'arch']
- else:
- attrs = ['name', 'epoch', 'ver', 'rel', 'arch']
- return make_nevra(*[getattr(self, '_'+a) for a in attrs],
- with_epoch=with_epoch)
-
def _match_nevr(match):
"""
@param match is a regexp match object with parsed rpm package
@@ -183,22 +95,33 @@ def _match_nevr(match):
"""
epoch = match.group('epoch')
return ( match.group('name')
- , epoch[:-1] if epoch else "0"
- , match.group('version')
- , match.group('release'))
+ , epoch if epoch and epoch.lower() != "(none)" else "0"
+ , match.group('ver')
+ , match.group('rel'))
-def _match2pkg(match):
+def _match2pkg(match, safe=False):
"""
@param match is a regexp match object with attributes:
name, epoch, version, release, arch
+ @param safe if True, the packe will have up_* properties equal to
+ non-prefixed ones, otherwise they will be set to None
@return instance of Package
"""
+ kwargs = {}
+ if safe is True:
+ kwargs['safe'] = True
+ else:
+ for attr in ('epoch', 'ver', 'rel', 'repo'):
+ kwargs['up_'+attr] = None
+ epoch = match.group('epoch')
+ if not epoch or epoch.lower() == "(none)":
+ epoch = '0'
return Package(
match.group('name'),
- match.group('epoch')[:-1] if match.group('epoch') else '0',
- match.group('version'), match.group('release'),
- match.group('arch'), match.groupdict().get('repository', None),
- None, None, None, None)
+ epoch,
+ match.group('ver'), match.group('rel'),
+ match.group('arch'), match.groupdict().get('repo', None),
+ **kwargs)
def _filter_duplicates(installed, avail_str):
"""
@@ -213,7 +136,7 @@ def _filter_duplicates(installed, avail_str):
dups_list = []
cur_package_matches = []
prev_match = None
- system_arch = get_system_architecture()
+ system_arch = util.get_system_architecture()
for match in RE_AVAIL_PKG.finditer(avail_str):
if ( _match_nevr(match) in [ _match_nevr(m)
for m in cur_package_matches]
@@ -246,12 +169,14 @@ def _check_single_pkg_deps(
"""
for match_deps in RE_DEPS_PROVIDERS.finditer(dependencies_str):
providers = []
+ if RE_DEPS_UNSATISFIED.search(match_deps.group('providers')):
+ return False
for match_dep in RE_PROVIDER.finditer(match_deps.group('providers')):
if match_dep.group('name') not in installed:
continue
providers.append(_match2pkg(match_dep))
for provider in providers:
- if is_installed(provider, False):
+ if is_pkg_installed(provider, False):
break
else: # no provider is installed
return False
@@ -261,7 +186,7 @@ def _check_pkg_dependencies(
installed,
dup_list,
number_of_packages=MAX_PKG_DB_SIZE,
- repolist=[]):
+ repolist=None):
"""
Finds packages from dup_list with satisfied (installed) dependencies.
@param installed is a set of installed package names
@@ -274,7 +199,7 @@ def _check_pkg_dependencies(
yum_params = yum_params[:1]
for dups in dups_part:
yum_params.extend([d.get_nevra(newer=False) for d in dups])
- deplist_str = run_yum(*yum_params, repolist=repolist)
+ deplist_str = util.run_yum(*yum_params, repolist=repolist)
matches = RE_PKG_DEPS.finditer(deplist_str)
prev_match = None
for pkgs in dups_part:
@@ -300,14 +225,14 @@ def _check_pkg_dependencies(
break
return dups_no_deps
-def _sorted_db_by_size(pkgdb, repolist=[]):
+def _sorted_db_by_size(pkgdb, repolist=None):
"""
@param pkgdb is a list of lists of packages with common name
@return sorted instances of Package according to their size
"""
yum_params = ['info', '--showduplicates']
yum_params.extend([ps[0].name for ps in pkgdb])
- info_str = run_yum(*yum_params, repolist=repolist)
+ info_str = util.run_yum(*yum_params, repolist=repolist)
pkg_sizes = {}
# to get correct ordering from "yum info" command
# { pkg_name : [(epoch, version, release), ... ] }
@@ -329,7 +254,7 @@ def _sorted_db_by_size(pkgdb, repolist=[]):
if not epoch:
epoch = "0"
pkg_version_order[pkg_name].append((
- epoch, info_match.group('version'), info_match.group('release')))
+ epoch, info_match.group('ver'), info_match.group('rel')))
pkgdb = sorted(pkgdb, key=lambda pkgs: pkg_sizes[pkgs[0].name])[
:MAX_PKG_DB_SIZE]
@@ -350,7 +275,7 @@ def _get_repo_list():
repos_str = check_output(['yum', 'repolist', '-q'])
return RE_REPO.findall(repos_str)
-def _download_pkgdb(repolist, pkgdb, cache_dir=None):
+def _download_dangerous(repolist, pkgdb, cache_dir=None):
"""
Downloads all rpm packages (old and newer versions) from package database
to current directory.
@@ -385,32 +310,6 @@ def _make_rpm_path(pkg, cache_dir='', newer=True, without_epoch=False):
with_epoch='NEVER' if without_epoch else 'NOT_ZERO')
return os.path.join(cache_dir, nevra) + '.rpm'
-def is_installed(pkg, newer=True):
- """
- Check, whether package is installed.
- """
- if not isinstance(pkg, Package):
- return call(["rpm", "--quiet", "-q", pkg]) == 0
- else:
- try:
- cmd = [ "rpm", "-q", "--qf", "%{EPOCH}:%{NVRA}", pkg.get_nevra(
- newer, with_epoch='NEVER') ]
- out = check_output(cmd)
- epoch, nvra = out.split(':') #pylint: disable=E1103
- if not epoch or epoch == "(none)":
- epoch = "0"
- return ( epoch == pkg.epoch
- and nvra == pkg.get_nevra(newer=newer, with_epoch="NEVER"))
- except CalledProcessError:
- return False
-
-def rpm_exists(pkg, cache_dir='', newer=True):
- """
- @return True, when rpm package is in cache.
- """
- return ( os.path.exists(_make_rpm_path(pkg, cache_dir, newer))
- or os.path.exists(_make_rpm_path(pkg, cache_dir, newer, True)))
-
def get_rpm_name(pkg, cache_dir='', newer=True):
"""
Some packages do not have epoch in their name, even if it's higher than
@@ -425,56 +324,126 @@ def get_rpm_name(pkg, cache_dir='', newer=True):
return path
raise MissingRPM(pkg.name)
-def get_system_architecture():
+def rpm_exists(pkg, cache_dir='', newer=True):
+ """
+ @return True, when rpm package is in cache.
+ """
+ return ( os.path.exists(_make_rpm_path(pkg, cache_dir, newer))
+ or os.path.exists(_make_rpm_path(pkg, cache_dir, newer, True)))
+
+
+def remove_pkg(pkg, *args):
+ """
+ Remove package with rpm command.
+ @param pkg is either instance of Package or package name
+ @param args is a list of parameters for rpm command
+ """
+ if isinstance(pkg, Package):
+ pkg = pkg.name
+ call(["rpm", "--quiet"] + list(args) + ["-e", pkg])
+
+def install_pkg(pkg, newer=True, repolist=None):
"""
- @return the system architecture name as seen by rpm
+ Install a specific package.
+ @param pkg is either package name or instance of Package
+ In latter case, a specific version is installed.
+ @param repolist is a list of repositories, that should be
+ used for downloading, if using yum
+ when empty, all enabled repositories are used
"""
- return check_output(['rpm', '-q', '--qf', '%{ARCH}\n', 'rpm'])
+ if isinstance(pkg, Package):
+ try:
+ rpm_name = get_rpm_name(pkg, newer=newer)
+ call(["rpm", "--quiet", "-i", rpm_name])
+ return
+ except MissingRPM:
+ pass
+ pkg = pkg.name
+ util.run_yum('-q', '-y', 'install', pkg, repolist=repolist)
+
+def ensure_pkg_installed(pkg, newer=True, repolist=None):
+ """
+ Ensures, that specific version of package is installed. If other
+ version is installed, it is removed and reinstalled.
+ """
+ if not isinstance(pkg, Package):
+ raise TypeError("pkg must be a Package instance")
+ if not is_pkg_installed(pkg, newer):
+ if is_pkg_installed(pkg.name):
+ remove_pkg(pkg.name)
+ install_pkg(pkg, newer, repolist)
-def write_pkgdb(pkgdb, cache_dir=''):
+def verify_pkg(pkg):
+ """
+ @return output of command rpm, with verification output for package
+ """
+ if isinstance(pkg, basestring):
+ name = pkg
+ elif isinstance(pkg, Package):
+ name = pkg.name
+ else:
+ raise TypeError("pkg must be either package name or Package instance")
+ return call(["rpm", "--quiet", "-Va", name]) == 0
+
+def has_pkg_config_file(pkg, file_path):
+ """
+ @return True, if file_path is a configuration file of package pkg.
+ """
+ cmd = ['rpm', '-qc', pkg.name]
+ out = check_output(cmd)
+ return file_path in set(out.splitlines()) #pylint: disable=E1103
+
+def has_pkg_doc_file(pkg, file_path):
+ """
+ @return True, if file_path is a documentation file of package pkg.
+ """
+ cmd = ['rpm', '-qd', pkg.name]
+ out = check_output(cmd)
+ return file_path in set(out.splitlines()) #pylint: disable=E1103
+
+def is_pkg_installed(pkg, newer=True):
+ """
+ Check, whether package is installed.
+ """
+ if not isinstance(pkg, Package):
+ return call(["rpm", "--quiet", "-q", pkg]) == 0
+ else:
+ cmd = [ "rpm", "-q", "--qf", "%{EPOCH}:%{NVRA}\n", pkg.get_nevra(
+ newer, with_epoch='NEVER') ]
+ try:
+ out = check_output(cmd).splitlines()[0]
+ epoch, nvra = out.split(':') #pylint: disable=E1103
+ if not epoch or epoch.lower() == "(none)":
+ epoch = "0"
+ return ( epoch == getattr(pkg, 'up_epoch' if newer else 'epoch')
+ and nvra == pkg.get_nevra(newer=newer, with_epoch="NEVER"))
+ except CalledProcessError:
+ return False
+
+def write_pkgdb(safe_pkgs, dangerous_pkgs, cache_dir=''):
"""
Writes package database into a file named DB_BACKUP_FILE.
"""
with open(os.path.join(cache_dir, DB_BACKUP_FILE), 'w') as db_file:
- pickle.dump((datetime.datetime.now(), pkgdb), db_file)
+ data = (datetime.datetime.now(), safe_pkgs, dangerous_pkgs)
+ pickle.dump(data, db_file)
def load_pkgdb(cache_dir=''):
"""
This is inverse function to _write_pkgdb().
- @return package database loaded from file
+ @return (safe, dangerous) package lists loaded from file
"""
with open(os.path.join(cache_dir, DB_BACKUP_FILE), 'r') as db_file:
- date_time, pkgdb = pickle.load(db_file)
+ _, safe, dangerous = pickle.load(db_file)
#print "Loaded package database from: %s" % date_time
- return pkgdb
+ return safe, dangerous
-def get_pkg_database(force_update=False, use_cache=True,
- cache_dir='',
- repolist=[]):
+def make_dangerous_list(installed, repolist=None):
"""
- Checks yum database for available packages, that have at least two
- different versions in repositories. Only not installed ones with
- all of their dependencies intalled are selected.
- And from those, few of the smallest are downloaded as rpms.
- @return list of instances of Package of selected packages
+ This makes a list of instances of Package for dangerous tests.
"""
- if ( use_cache and not force_update
- and os.path.exists(os.path.join(cache_dir, DB_BACKUP_FILE))):
- pkgdb = load_pkgdb(cache_dir)
- valid_db = True
- for pkg in pkgdb:
- if ( not rpm_exists(pkg, cache_dir, False)
- or not rpm_exists(pkg, cache_dir, True)):
- valid_db = False
- #print "Old package database is not valid"
- break
- if valid_db:
- return pkgdb
- #print "Getting installed packages"
- installed = set(check_output( #pylint: disable=E1103
- ['rpm', '-qa', '--qf=%{NAME}\n']).splitlines())
#print "Getting all available packages"
- avail_str = run_yum('list', 'available', '--showduplicates',
+ avail_str = util.run_yum('list', 'available', '--showduplicates',
repolist=repolist)
# list of lists of packages with the same name, longer than 2
#print "Finding duplicates"
@@ -485,11 +454,88 @@ def get_pkg_database(force_update=False, use_cache=True,
number_of_packages=MAX_PKG_DB_SIZE*5,
repolist=repolist)
#print "Selecting the smallest ones"
- pkgdb = _sorted_db_by_size(selected, repolist=repolist)
+ return _sorted_db_by_size(selected, repolist=repolist)
+
+def make_safe_list(installed, exclude=None):
+ """
+ Makes list of installed packages for non-dangerous tests.
+ @param installed is a list of installed packages names
+ @param exclude is a list of package names, that won't appear in result
+ """
+ if exclude is None:
+ exclude = set()
+ base = list(installed)
+ random.shuffle(base)
+ res = []
+ i = 0
+ while len(res) < MAX_PKG_DB_SIZE and i < len(base):
+ name = base[i]
+ i += 1
+ if name in exclude:
+ continue
+ cmd = [ "rpm", "-q", "--qf", "%{EPOCH}:%{NVRA}\n", name ]
+ envra = check_output(cmd).splitlines()[0]
+ res.append(_match2pkg(util.RE_ENVRA.match(envra), safe=True))
+ return res
+
+def get_pkg_database(
+ force_update=False,
+ use_cache=True,
+ dangerous=False,
+ cache_dir='',
+ repolist=None):
+ """
+ Checks yum database for available packages, that have at least two
+ different versions in repositories. Only not installed ones with
+ all of their dependencies intalled are selected.
+ And from those, few of the smallest are downloaded as rpms.
+ @param dangerous whether to load available, not installed
+ packages for dangerous tests
+ @return (safe, dangerous)
+ where
+ safe is a list of instances of Package, representing installed
+ software, these should be used for not-dangerous tests;
+ both older
+ dangerous is a list of instances of Package of selected packages,
+ that are not installed, but available; instances contain
+ both newer and older version of package
+ """
+ dangerous_pkgs = []
+ if ( use_cache and not force_update
+ and os.path.exists(os.path.join(cache_dir, DB_BACKUP_FILE))):
+ safe_pkgs, dangerous_pkgs = load_pkgdb(cache_dir)
+ valid_db = True
+ if len(safe_pkgs) < MAX_PKG_DB_SIZE:
+ valid_db = False
+ elif not dangerous and len(dangerous_pkgs) > 0:
+ dangerous_pkgs = []
+ elif dangerous and len(dangerous_pkgs) == 0:
+ valid_db = False
+ else:
+ for pkg in safe_pkgs:
+ if not is_pkg_installed(pkg):
+ valid_db = False
+ break
+ for pkg in dangerous_pkgs:
+ if ( not rpm_exists(pkg, cache_dir, False)
+ or not rpm_exists(pkg, cache_dir, True)):
+ valid_db = False
+ #print "Old package database is not valid"
+ break
+ if valid_db:
+ return (safe_pkgs, dangerous_pkgs)
+ #print "Getting installed packages"
+ installed = set(check_output( #pylint: disable=E1103
+ ['rpm', '-qa', '--qf=%{NAME}\n']).splitlines())
+ if dangerous:
+ dangerous_pkgs = make_dangerous_list(installed)
+ safe_pkgs = make_safe_list(installed, exclude=set(
+ pkg.name for pkg in dangerous_pkgs))
+
if use_cache:
- repolist = _get_repo_list()
- _download_pkgdb(repolist, pkgdb, cache_dir)
+ repolist = _get_repo_list() if repolist in (None, []) else repolist
+ _download_dangerous(repolist, dangerous_pkgs, cache_dir)
#print "Backing up database information"
- write_pkgdb(pkgdb, cache_dir)
- return pkgdb
+ write_pkgdb(safe_pkgs, dangerous_pkgs, cache_dir)
+ return (safe_pkgs, dangerous_pkgs)
diff --git a/src/software/test/run.py b/src/software/test/run.py
index 950b0c9..00ef304 100755
--- a/src/software/test/run.py
+++ b/src/software/test/run.py
@@ -68,6 +68,7 @@ def parse_cmd_line():
parser.add_argument("-p", "--password",
default=os.environ.get("LMI_CIMOM_PASSWORD", ''),
help="User's password.")
+
dangerous_group = parser.add_mutually_exclusive_group()
dangerous_group.add_argument("--run-dangerous", action="store_true",
default=(os.environ.get('LMI_RUN_DANGEROUS', '0') == '1'),
@@ -78,6 +79,17 @@ def parse_cmd_line():
dest="run_dangerous",
default=os.environ.get('LMI_RUN_DANGEROUS', '0') == '1',
help="Disable dangerous tests.")
+
+ tedious_group = parser.add_mutually_exclusive_group()
+ tedious_group.add_argument("--run-tedious", action="store_true",
+ default=(os.environ.get('LMI_RUN_TEDIOUS', '0') == '1'),
+ help="Run also tedious (long running) for this machine."
+ " Overrides environment variable LMI_RUN_TEDIOUS.")
+ tedious_group.add_argument('--no-tedious', action="store_false",
+ dest="run_tedious",
+ default=os.environ.get('LMI_RUN_TEDIOUS', '0') == '1',
+ help="Disable tedious tests.")
+
cache_group = parser.add_mutually_exclusive_group()
cache_group.add_argument("-c", "--use-cache", action="store_true",
default=(os.environ.get('LMI_SOFTWARE_USE_CACHE', '0') == '1'),
@@ -160,6 +172,8 @@ def prepare_environment(args):
os.environ['LMI_CIMOM_PASSWORD'] = args.password
os.environ['LMI_RUN_DANGEROUS'] = (
'1' if args.run_dangerous else '0')
+ os.environ["LMI_RUN_TEDIOUS"] = (
+ '1' if args.run_tedious else '0')
os.environ['LMI_SOFTWARE_USE_CACHE'] = '1' if args.use_cache else '0'
if args.use_cache:
os.environ['LMI_SOFTWARE_CACHE_DIR'] = CACHE_DIR
@@ -270,9 +284,12 @@ def main():
repolist = args.test_repos.split(',')
else:
repolist = []
- rpmcache.get_pkg_database(args.force_update, args.use_cache,
- CACHE_DIR, repolist=repolist)
- #rpmcache.make_pkg_database(packages
+ rpmcache.get_pkg_database(
+ args.force_update,
+ args.use_cache,
+ dangerous=args.run_dangerous,
+ cache_dir=CACHE_DIR,
+ repolist=repolist)
prepare_environment(args)
test_program = unittest.main(argv=ut_args,
testLoader=LMITestLoader(), exit=False)
diff --git a/src/software/test/test_hosted_software_collection.py b/src/software/test/test_hosted_software_collection.py
new file mode 100755
index 0000000..dc798d8
--- /dev/null
+++ b/src/software/test/test_hosted_software_collection.py
@@ -0,0 +1,155 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Michal Minar <miminar@redhat.com>
+#
+"""
+Unit tests for LMI_MemberOfSoftwareCollection provider.
+"""
+
+import pywbem
+import socket
+import unittest
+
+import base
+
+class TestHostedSoftwareCollection(base.SoftwareBaseTestCase):
+ """
+ Basic cim operations test.
+ """
+
+ CLASS_NAME = "LMI_HostedSoftwareCollection"
+ KEYS = ("Antecedent", "Dependent")
+
+ def make_op(self):
+ """
+ @param ses SoftwareElementState property value
+ @return object path of SoftwareIdentity
+ """
+ objpath = self.objpath.copy()
+ objpath["Antecedent"] = pywbem.CIMInstanceName(
+ classname="Linux_ComputerSystem",
+ namespace="root/cimv2",
+ keybindings=pywbem.NocaseDict({
+ "CreationClassName" : "Linux_ComputerSystem",
+ "Name" : socket.gethostname()
+ }))
+ objpath["Dependent"] = pywbem.CIMInstanceName(
+ classname="LMI_SystemSoftwareCollection",
+ namespace="root/cimv2",
+ keybindings=pywbem.NocaseDict({
+ "InstanceID" : "LMI:SystemSoftwareCollection"
+ }))
+ return objpath
+
+ def test_get_instance(self):
+ """
+ Tests GetInstance call on packages from our rpm cache.
+ """
+ objpath = self.make_op()
+ inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False)
+ self.assertIsInstance(inst, pywbem.CIMInstance)
+ self.assertEqual(inst.path, objpath)
+ self.assertEqual(sorted(inst.properties.keys()), sorted(self.KEYS))
+ self.assertEqual(objpath, inst.path)
+ for key in self.KEYS:
+ self.assertEqual(inst[key], inst.path[key])
+
+ # try with CIM_ prefix
+ antecedent = objpath["Antecedent"].copy()
+ objpath["Antecedent"].classname = "CIM_ComputerSystem"
+
+ inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False)
+ self.assertIsInstance(inst, pywbem.CIMInstance)
+ objpath["Antecedent"] = antecedent.copy()
+ self.assertEqual(objpath, inst.path)
+ self.assertEqual(sorted(inst.properties.keys()), sorted(self.KEYS))
+ for key in self.KEYS:
+ self.assertEqual(inst[key], inst.path[key])
+
+ # try with CIM_ prefix also for CreationClassName
+ objpath["Antecedent"]["CreationClassName"] = "CIM_ComputerSystem"
+
+ inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False)
+ self.assertIsInstance(inst, pywbem.CIMInstance)
+ objpath["Antecedent"] = antecedent.copy()
+ self.assertEqual(objpath, inst.path)
+ self.assertEqual(sorted(inst.properties.keys()), sorted(self.KEYS))
+ for key in self.KEYS:
+ self.assertEqual(inst[key], inst.path[key])
+
+ def test_enum_instances(self):
+ """
+ Tests EnumInstances call on installed packages.
+ """
+ objpath = self.make_op()
+ insts = self.conn.EnumerateInstances(ClassName=self.CLASS_NAME)
+ self.assertEqual(1, len(insts))
+ self.assertEqual(objpath, insts[0].path)
+ self.assertEqual(sorted(insts[0].properties.keys()), sorted(self.KEYS))
+ self.assertEqual(objpath, insts[0].path)
+ for key in self.KEYS:
+ self.assertEqual(insts[0][key], insts[0].path[key])
+
+ def test_enum_instance_names(self):
+ """
+ Tests EnumInstanceNames call on installed packages.
+ """
+ objpath = self.make_op()
+ inames = self.conn.EnumerateInstanceNames(ClassName=self.CLASS_NAME)
+ self.assertEqual(1, len(inames))
+ self.assertEqual(objpath, inames[0])
+
+ def test_get_antecedent_referents(self):
+ """
+ Test ReferenceNames for ComputerSystem.
+ """
+ objpath = self.make_op()
+ refs = self.conn.ReferenceNames(
+ Role="Antecedent",
+ ObjectName=objpath["Antecedent"])
+ self.assertGreater(len(refs), 0)
+ refs = [ r for r in refs if "Dependent" in r
+ and r["Dependent"].classname == \
+ "LMI_SystemSoftwareCollection" ]
+ self.assertEqual(1, len(refs))
+ ref = refs[0]
+ self.assertEqual(ref, objpath)
+
+ @base.mark_dangerous
+ def test_get_dependent_referents(self):
+ """
+ Test ReferenceNames for SystemSoftwareCollection.
+ """
+ objpath = self.make_op()
+ refs = self.conn.AssociatorNames(
+ ObjectName=objpath["Dependent"],
+ Role="Dependent",
+ ResultRole="Antecedent",
+ ResultClass="Linux_ComputerSystem")
+ self.assertEqual(1, len(refs))
+ ref = refs[0]
+ self.assertEqual(objpath["Antecedent"], ref)
+
+def suite():
+ """For unittest loaders."""
+ return unittest.TestLoader().loadTestsFromTestCase(
+ TestHostedSoftwareCollection)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/src/software/test/test_member_of_software_collection.py b/src/software/test/test_member_of_software_collection.py
new file mode 100755
index 0000000..4d375c4
--- /dev/null
+++ b/src/software/test/test_member_of_software_collection.py
@@ -0,0 +1,155 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Michal Minar <miminar@redhat.com>
+#
+"""
+Unit tests for LMI_MemberOfSoftwareCollection provider.
+"""
+
+import pywbem
+import unittest
+
+import base
+
+class TestMemberOfSoftwareCollection(base.SoftwareBaseTestCase):
+ """
+ Basic cim operations test.
+ """
+
+ CLASS_NAME = "LMI_MemberOfSoftwareCollection"
+ KEYS = ("Collection", "Member")
+
+ def make_op(self, pkg, newer=True):
+ """
+ @return object path of MembeOfSoftwareCollection association
+ """
+ objpath = self.objpath.copy()
+ objpath["Collection"] = pywbem.CIMInstanceName(
+ classname="LMI_SystemSoftwareCollection",
+ namespace="root/cimv2",
+ keybindings=pywbem.NocaseDict({
+ "InstanceID" : "LMI:SystemSoftwareCollection"
+ }))
+ objpath["Member"] = pywbem.CIMInstanceName(
+ classname="LMI_SoftwareIdentity",
+ namespace="root/cimv2",
+ keybindings=pywbem.NocaseDict({
+ "InstanceID" : 'LMI:PKG:' + pkg.get_nevra(newer=newer,
+ with_epoch="ALWAYS")
+ }))
+ return objpath
+
+ def test_get_instance(self):
+ """
+ Tests GetInstance call on packages from our rpm cache.
+ """
+ for pkg in self.safe_pkgs:
+ objpath = self.make_op(pkg)
+ inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False)
+ self.assertEqual(inst.path, objpath,
+ "Object paths should match for package %s"%pkg)
+ for key in self.KEYS:
+ self.assertTrue(inst.properties.has_key(key),
+ "OP is missing \"%s\" key for package %s"%(key, pkg))
+ self.assertEqual(objpath, inst.path)
+
+ @base.mark_tedious
+ def test_enum_instance_names(self):
+ """
+ Tests EnumInstanceNames call on installed packages.
+ """
+ inames = self.conn.EnumerateInstanceNames(ClassName=self.CLASS_NAME)
+ self.assertGreater(len(inames), 0)
+ objpath = self.make_op(self.safe_pkgs[0])
+ for iname in inames:
+ self.assertIsInstance(iname, pywbem.CIMInstanceName)
+ self.assertEqual(iname.namespace, 'root/cimv2')
+ self.assertEqual(iname.classname, self.CLASS_NAME)
+ self.assertEqual(sorted(iname.keys()), sorted(self.KEYS))
+ self.assertEqual(objpath["Collection"], iname["Collection"])
+ nevra_set = set(i["Member"]["InstanceID"] for i in inames)
+ for pkg in self.safe_pkgs:
+ nevra = 'LMI:PKG:'+pkg.get_nevra(with_epoch="ALWAYS")
+ self.assertTrue(nevra in nevra_set,
+ 'Missing nevra "%s".' % nevra)
+
+# @base.mark_tedious
+# def test_enum_instances(self):
+# """
+# Tests EnumInstances call on installed packages.
+# """
+# insts = self.conn.EnumerateInstances(ClassName=self.CLASS_NAME)
+# self.assertGreater(len(insts), 0)
+# for inst in insts:
+# self.assertIsInstance(inst, pywbem.CIMInstance)
+# self.assertEqual(inst.namespace, 'root/cimv2')
+# self.assertEqual(inst.classname, self.CLASS_NAME)
+# self.assertEqual(sorted(inst.keys()), sorted(self.KEYS))
+# self.assertEqual(inst["InstanceID"], inst.path["InstanceID"])
+# nevra_set = set(i["Member"]["InstanceID"] for i in insts)
+# for pkg in self.safe_pkgs:
+# self.assertIn(pkg.get_nevra(with_epoch="ALWAYS"), nevra_set)
+
+ @base.mark_tedious
+ def test_get_collection_referents(self):
+ """
+ Test ReferenceNames for SystemSoftwareCollection.
+ """
+ objpath = self.make_op(self.safe_pkgs[0])
+ refs = self.conn.AssociatorNames(
+ Role="Collection",
+ ObjectName=objpath["Collection"],
+ ResultRole="Member",
+ ResultClass="LMI_SoftwareIdentity")
+ self.assertGreater(len(refs), 0)
+ for ref in refs:
+ self.assertIsInstance(ref, pywbem.CIMInstanceName)
+ self.assertEqual(ref.namespace, 'root/cimv2')
+ self.assertEqual(ref.classname, "LMI_SoftwareIdentity")
+ self.assertEqual(sorted(ref.keys()), ["InstanceID"])
+ self.assertTrue(ref["InstanceID"].startswith("LMI:PKG:"))
+ nevra_set = set(i["InstanceID"] for i in refs)
+ # NOTE: installed packages might not be available
+ for pkg in self.safe_pkgs:
+ nevra = 'LMI:PKG:'+pkg.get_nevra(with_epoch="ALWAYS")
+ self.assertTrue(nevra in nevra_set,
+ 'Missing nevra "%s".' % nevra)
+
+ def test_get_member_referents(self):
+ """
+ Test ReferenceNames for SoftwareIdentity.
+ """
+ for pkg in self.safe_pkgs:
+ objpath = self.make_op(pkg)
+ refs = self.conn.AssociatorNames(
+ ObjectName=objpath["Member"],
+ Role="Member",
+ ResultRole="Collection",
+ ResultClass="LMI_SystemSoftwareCollection")
+ self.assertEqual(len(refs), 1)
+ ref = refs[0]
+ self.assertEqual(objpath["Collection"], ref)
+
+def suite():
+ """For unittest loaders."""
+ return unittest.TestLoader().loadTestsFromTestCase(
+ TestMemberOfSoftwareCollection)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/src/software/test/test_software_file_check.py b/src/software/test/test_software_file_check.py
deleted file mode 100755
index 7b97ee9..0000000
--- a/src/software/test/test_software_file_check.py
+++ /dev/null
@@ -1,527 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-#
-# Authors: Michal Minar <miminar@redhat.com>
-#
-"""
-Unit tests for LMI_SoftwareFileCheck provider.
-"""
-
-import hashlib
-import os
-import pywbem
-import re
-import subprocess
-import unittest
-from collections import namedtuple
-
-import common
-
-RE_CHECKSUM = re.compile(r'^([0-9a-fA-F]+)\s+.*')
-
-PassedFlags = namedtuple("PassedFlags", #pylint: disable=C0103
- "exists, type, size, mode, checksum, dev, ltarget, uid, gid, mtime")
-
-class TestSoftwareFileCheck(common.SoftwareBaseTestCase):
- #pylint: disable=R0904
- """
- Basic cim operations test.
- """
-
- CLASS_NAME = "LMI_SoftwareFileCheck"
- KEYS = ( "CheckID", "Name", "SoftwareElementID", "SoftwareElementState"
- , "TargetOperatingSystem", "Version")
-
- hash_num2algo = {
- 1 : hashlib.md5, #pylint: disable=E1101
- 2 : hashlib.sha1, #pylint: disable=E1101
- 8 : hashlib.sha256, #pylint: disable=E1101
- 9 : hashlib.sha384, #pylint: disable=E1101
- 10 : hashlib.sha512, #pylint: disable=E1101
- 11 : hashlib.sha224 #pylint: disable=E1101
- }
-
- hash_num2cmd = {
- 1 : "md5sum",
- 2 : "sha1sum",
- 8 : "sha256sum",
- 9 : "sha384sum",
- 10 : "sha512sum",
- 11 : "sha224sum"
- }
-
- hash_num2length = {
- 1 : 32,
- 2 : 40,
- 8 : 64,
- 9 : 96,
- 10 : 128,
- 11 : 56
- }
-
- def make_op(self, pkg, filename, newer=True):
- """
- @return object path of LMI_SoftwareFileCheck
- """
- objpath = self.objpath.copy()
- objpath["Name"] = filename
- objpath["Version"] = getattr(pkg, "up_ver" if newer else "ver")
- objpath["CheckID"] = "%s#%s" % (pkg.name, filename)
- objpath["SoftwareElementState"] = pywbem.Uint16(2)
- objpath["TargetOperatingSystem"] = pywbem.Uint16(36)
- objpath["SoftwareElementID"] = pkg.get_nevra(newer)
- return objpath
-
- def assertEqualSEID(self, id1, id2, msg=None):
- """
- This is for comparison of SoftwareElementID property values.
- """
- if ( not isinstance(id1, basestring)
- or not isinstance(id2, basestring)):
- return common.SoftwareBaseTestCase.assertEqual(self, id1, id2, msg)
- match1 = common.RE_NEVRA.match(id1)
- match2 = common.RE_NEVRA.match(id2)
- if not match1 or not match2:
- return common.SoftwareBaseTestCase.assertEqual(self, id1, id2, msg)
- if any( match1.group(g) != match2.group(g)
- for g in ('name', 'ver', 'rel', 'arch')):
- return common.SoftwareBaseTestCase.assertEqual(self, id1, id2, msg)
- epoch1 = match1.group('epoch')
- epoch2 = match2.group('epoch')
- if not epoch1:
- epoch1 = '0'
- if not epoch2:
- epoch2 = '0'
- if epoch1 != epoch2:
- return common.SoftwareBaseTestCase.assertEqual(self, id1, id2, msg)
- return True
-
- def assertEqual(self, op1, op2, msg=None):
- """
- This is override for object paths, that allows some differences
- (like missing epoch in SoftwareElementID).
- """
- if ( not isinstance(op1, pywbem.CIMInstanceName)
- or not isinstance(op2, pywbem.CIMInstanceName)
- or op1.classname != op2.classname
- or op1.namespace != op2.namespace
- or op1.keybindings.keys() != op2.keybindings.keys()
- or any(op1[a] != op2[a] for a in
- ( 'Name', 'Version', 'CheckID', 'SoftwareElementState'
- , 'TargetOperatingSystem'))):
- return common.SoftwareBaseTestCase.assertEqual(self, op1, op2, msg)
- return self.assertEqualSEID(
- op1['SoftwareElementID'], op2['SoftwareElementID'], msg)
-
- def make_checksum_str(self, csumnum, filename):
- """
- @return checksum of installed file
- """
- return RE_CHECKSUM.match(subprocess.check_output([
- self.hash_num2cmd[csumnum], filename])).group(1).lower()
-
- def do_check_symlink(self, pkg, filepath, inst):
- """
- Assert some details about symlink.
- """
- target = os.readlink(filepath)
- stats = os.lstat(filepath)
-
- self.assertEqual(pywbem.Uint16(3), inst["FileType"],
- "Unexpected file type of symlink for %s:%s"%(
- pkg.name, filepath))
- self.assertEqual(stats.st_uid, inst["FileUserID"],
- "Unexpected uid of symlink for %s:%s"%(pkg.name, filepath))
- self.assertEqual(stats.st_gid, inst["FileGroupID"],
- "Unexpected gid of symlink for %s:%s"%(pkg.name, filepath))
- self.assertEqual(stats.st_mode, inst["FileMode"],
- "Unexpected mode of symlink for %s:%s" %(pkg.name, filepath))
- self.assertEqual(stats.st_size, inst["FileSize"],
- "Unexpected size of symlink for %s:%s"%(pkg.name, filepath))
- self.assertEqual(target, inst["LinkTarget"],
- "Unexpected size of symlink for %s:%s"%(pkg.name, filepath))
- self.assertEqual("0"*self.hash_num2length[inst["FileChecksumType"]],
- inst["FileChecksum"],
- "Unexpected hash of symlink for %s:%s"%(pkg.name, filepath))
- self.assertEqual(int(stats.st_mtime), inst["LastModificationTime"],
- "Unexpected mtime of symlink for %s:%s"%(pkg.name, filepath))
-
- # modify owner
- prev_user = inst["FileUserID"]
- #prev_mtime = inst["LastModificationTime"]
- #prev_pflags = PassedFlags(*inst["PassedFlags"])
- os.lchown(filepath, stats.st_uid + 1, -1)
- inst = self.conn.GetInstance(InstanceName=inst.path)
- self.assertEqual(inst["ExpectedFileUserID"] + 1, inst["FileUserID"],
- "Unexpected uid of modified symlink for %s:%s"%(
- pkg.name, filepath))
- self.assertEqual(prev_user + 1, inst["FileUserID"],
- "Unexpected uid of modified symlink for %s:%s"%(
- pkg.name, filepath))
- self.assertEqual(stats.st_gid, inst["FileGroupID"],
- "Unexpected gid of modified symlink for %s:%s"%(
- pkg.name, filepath))
- cur_pflags = PassedFlags(*inst["PassedFlags"])
- #self.assertGreater(inst["LastModificationTime"], prev_mtime)
-
- self.assertTrue(cur_pflags.exists,
- "Symlink %s:%s should exist." %(pkg.name, filepath))
- self.assertTrue(cur_pflags.type,
- "File type should match for symlink %s:%s"%(pkg.name, filepath))
- self.assertTrue(cur_pflags.size,
- "File size should match for symlink %s:%s"%(pkg.name, filepath))
- self.assertTrue(cur_pflags.mode,
- "File mode should match for symlink %s:%s"%(pkg.name, filepath))
- self.assertTrue(cur_pflags.checksum,
- "File checksum should match for symlink %s:%s"%(
- pkg.name, filepath))
- self.assertTrue(cur_pflags.dev,
- "Device number should match for symlink %s:%s"%(
- pkg.name, filepath))
- self.assertTrue(cur_pflags.ltarget,
- "Link target should match for symlink %s:%s"%(pkg.name, filepath))
- self.assertFalse(cur_pflags.uid,
- "Uid should not match for symlink %s:%s"%(pkg.name, filepath))
- self.assertTrue(cur_pflags.gid,
- "Gid shoud match for symlink %s:%s"%(pkg.name, filepath))
- self.assertTrue(cur_pflags.mtime,
- "Mtime should match for symlink %s:%s"%(pkg.name, filepath))
-
- # modify link_target
- os.remove(filepath)
- os.symlink("wrong" + "*"*len(inst["ExpectedLinkTarget"]), filepath)
- os.lchown(filepath, inst["ExpectedFileUserID"],
- inst["ExpectedFileGroupID"])
-
- inst = self.conn.GetInstance(InstanceName=inst.path)
- cur_pflags = PassedFlags(*inst["PassedFlags"])
- self.assertGreater(len(inst["LinkTarget"]),
- len(inst["ExpectedLinkTarget"]))
-
- self.assertTrue(cur_pflags.exists,
- "File %s:%s should exist"%(pkg.name, filepath))
- self.assertTrue(cur_pflags.type,
- "File type should match for symlink %s:%s"%(pkg.name, filepath))
- # file size not checked for symlinks
- #self.assertFalse(cur_pflags.size,
- #"File size should not match for symlink %s:%s"%(
- #pkg.name, filepath))
- self.assertTrue(cur_pflags.mode,
- "File mode should match for symlink %s:%s"%(pkg.name, filepath))
- self.assertTrue(cur_pflags.checksum,
- "Checksum should match for symlink %s:%s"%(
- pkg.name, filepath))
- self.assertTrue(cur_pflags.dev,
- "Device numbers should match for symlink %s:%s"%(
- pkg.name, filepath))
- self.assertFalse(cur_pflags.ltarget,
- "Link target should not match for symlink %s:%s"%(
- pkg.name, filepath))
- self.assertTrue(cur_pflags.uid,
- "File uid should match for symlink %s:%s"%(pkg.name, filepath))
- self.assertTrue(cur_pflags.gid,
- "File gid should match for symlink %s:%s"%(pkg.name, filepath))
- self.assertTrue(cur_pflags.mtime,
- "File mtime should match for symlink %s:%s"%(pkg.name, filepath))
-
- def do_check_directory(self, pkg, filepath, inst):
- """
- Assert some details about directory.
- """
- stats = os.lstat(filepath)
-
- self.assertEqual(pywbem.Uint16(2), inst["FileType"],
- "Unexpected type for directory %s:%s"%(pkg.name, filepath))
- self.assertEqual(stats.st_uid, inst["FileUserID"],
- "Unexpected uid for directory %s:%s"%(pkg.name, filepath))
- self.assertEqual(stats.st_gid, inst["FileGroupID"],
- "Unexpected gid for directory %s:%s"%(pkg.name, filepath))
- self.assertEqual(stats.st_mode, inst["FileMode"],
- "Unexpected mode for directory %s:%s"%(pkg.name, filepath))
- self.assertEqual(stats.st_size, inst["FileSize"],
- "Unexpected size for directory %s:%s"%(pkg.name, filepath))
- self.assertIs(inst["LinkTarget"], None)
- self.assertEqual("0"*self.hash_num2length[inst["FileChecksumType"]],
- inst["FileChecksum"],
- "Unexpected checksum for directory %s:%s"%(pkg.name, filepath))
- self.assertEqual(int(stats.st_mtime), inst["LastModificationTime"],
- "Unexpected mtime for directory %s:%s"%(pkg.name, filepath))
-
- def do_check_file(self, pkg, filepath, inst):
- """
- Assert some details about regurar file.
- """
- stats = os.lstat(filepath)
-
- self.assertEqual(pywbem.Uint16(1), inst["FileType"],
- "Unexpected file type for %s:%s"%(pkg.name, filepath))
- self.assertEqual(stats.st_uid, inst["FileUserID"],
- "Unexpected file uid for %s:%s"%(pkg.name, filepath))
- self.assertEqual(stats.st_gid, inst["FileGroupID"],
- "Unexpected gid for regular file %s:%s"%(pkg.name, filepath))
- self.assertEqual(stats.st_mode, inst["FileMode"],
- "Unexpected mode for reqular file %s:%s"%(pkg.name, filepath))
- self.assertEqual(stats.st_size, inst["FileSize"],
- "Unexpected size for reqular file %s:%s"%(pkg.name, filepath))
- self.assertIs(inst["LinkTarget"], None)
- csum = self.make_checksum_str(inst['FileChecksumType'], filepath)
- self.assertEqual(csum, inst["FileChecksum"].lower(),
- "Unexpected checksum for reqular file %s:%s"%(pkg.name, filepath))
- self.assertEqual(inst["LastModificationTime"],
- inst["ExpectedLastModificationTime"],
- "Unexpected mtime for reqular file %s:%s"%(pkg.name, filepath))
- self.assertEqual(int(stats.st_mtime), inst["LastModificationTime"],
- "Unexpected mtime for reqular file %s:%s"%(pkg.name, filepath))
-
- # make it longer
- with open(filepath, "a+") as fobj:
- fobj.write("data\n")
- inst = self.conn.GetInstance(InstanceName=inst.path)
- cur_pflags = PassedFlags(*inst["PassedFlags"])
- self.assertGreater(inst["FileSize"], inst["ExpectedFileSize"],
- "File size should be greater, then expected for reqular file"
- " %s:%s"%(pkg.name, filepath))
- self.assertGreater(inst["LastModificationTime"],
- inst["ExpectedLastModificationTime"],
- "Unexpected mtime for reqular file %s:%s"%(pkg.name, filepath))
-
- self.assertTrue(cur_pflags.exists,
- "Regular file should exist %s:%s"%(pkg.name, filepath))
- self.assertTrue(cur_pflags.type,
- "Type of regular file should match for %s:%s"%(pkg.name, filepath))
- self.assertFalse(cur_pflags.size,
- "Size should not match for regular file %s:%s"%(
- pkg.name, filepath))
- self.assertTrue(cur_pflags.mode,
- "Mode should match for regular file %s:%s"%(pkg.name, filepath))
- self.assertFalse(cur_pflags.checksum,
- "Checksum should not match for regular file %s:%s"%(
- pkg.name, filepath))
- self.assertTrue(cur_pflags.dev,
- "Device number should match for regular file %s:%s"%(
- pkg.name, filepath))
- self.assertTrue(cur_pflags.ltarget,
- "Link target should match for regular file %s:%s"%(
- pkg.name, filepath))
- self.assertTrue(cur_pflags.uid,
- "File uid should match for %s:%s"%(pkg.name, filepath))
- self.assertTrue(cur_pflags.gid,
- "File gid should match for %s:%s"%(pkg.name, filepath))
- self.assertFalse(cur_pflags.mtime,
- "File mtime should not match for %s:%s"%(pkg.name, filepath))
-
- # change file type
- os.remove(filepath)
- os.symlink(filepath, filepath)
- os.lchown(filepath, inst["ExpectedFileUserID"],
- inst["ExpectedFileGroupID"])
- inst = self.conn.GetInstance(InstanceName=inst.path)
- cur_pflags = PassedFlags(*inst["PassedFlags"])
- self.assertNotEqual(inst["ExpectedLinkTarget"], inst["LinkTarget"],
- "Link target should not match for %s:%s"%(pkg.name, filepath))
- self.assertNotEqual(inst["ExpectedFileSize"], inst["FileSize"],
- "File size should not match for %s:%s"%(pkg.name, filepath))
- self.assertGreater(inst["LastModificationTime"],
- inst["ExpectedLastModificationTime"],
- "File mtime should be greater than expected for %s:%s"%(
- pkg.name, filepath))
- self.assertNotEqual(inst["ExpectedFileType"], inst["FileType"],
- "File type should not match for %s:%s"%(pkg.name, filepath))
- self.assertEqual(pywbem.Uint16(3), inst["FileType"],
- "File type should match for %s:%s"%(pkg.name, filepath))
-
- self.assertTrue(cur_pflags.exists,
- "Regular file %s:%s should exist"%(pkg.name, filepath))
- self.assertFalse(cur_pflags.type,
- "Regular file type should not match for %s:%s"%(
- pkg.name, filepath))
- self.assertFalse(cur_pflags.size,
- "Size should not match for regular file %s:%s"%(
- pkg.name, filepath))
- self.assertFalse(cur_pflags.mode,
- "File mode should not match for regular file %s:%s"%(
- pkg.name, filepath))
- self.assertFalse(cur_pflags.checksum,
- "Checksum should not match for regular file %s:%s"%(
- pkg.name, filepath))
- self.assertTrue(cur_pflags.dev,
- "Device should match for regular file %s:%s"%(
- pkg.name, filepath))
- self.assertFalse(cur_pflags.ltarget,
- "Link target should not match for regular file %s:%s"%(
- pkg.name, filepath))
- self.assertTrue(cur_pflags.uid,
- "Regular file's uid should match for %s:%s"%(
- pkg.name, filepath))
- self.assertTrue(cur_pflags.gid,
- "Regular file's gid should match for %s:%s"%(
- pkg.name, filepath))
- self.assertFalse(cur_pflags.mtime,
- "Regular file's mtime should not match for %s:%s"%(
- pkg.name, filepath))
-
- # remove it
- os.remove(filepath)
- inst = self.conn.GetInstance(InstanceName=inst.path)
- cur_pflags = PassedFlags(*inst["PassedFlags"])
- self.assertEqual(inst["ExpectedLinkTarget"], inst["LinkTarget"],
- "Link target does not match for regular file %s:%s"%(
- pkg.name, filepath))
- self.assertNotEqual(inst["ExpectedFileSize"], inst["FileSize"],
- "File size should not match for regular file %s:%s"%(
- pkg.name, filepath))
- self.assertIsNone(inst["LastModificationTime"])
- self.assertIsNone(inst["FileType"])
- self.assertIsNone(inst["FileChecksum"])
- self.assertIsNone(inst["FileMode"])
- self.assertIsNone(inst["FileUserID"])
- self.assertIsNone(inst["FileGroupID"])
-
- self.assertFalse(cur_pflags.exists,
- "Regular file %s:%s should not exist"%(pkg.name, filepath))
- self.assertFalse(cur_pflags.type,
- "Regular file's type should not match for %s:%s"%(
- pkg.name, filepath))
- self.assertFalse(cur_pflags.size,
- "Regular file's size should not match for %s:%s"%(
- pkg.name, filepath))
- self.assertFalse(cur_pflags.mode,
- "Regular file's mode should not match for %s:%s"%(
- pkg.name, filepath))
- self.assertFalse(cur_pflags.checksum,
- "Regular file's checksum should not match for %s:%s"%(
- pkg.name, filepath))
- self.assertFalse(cur_pflags.dev,
- "Regular file's dev number should not match %s:%s"%(
- pkg.name, filepath))
- self.assertFalse(cur_pflags.ltarget,
- "Regular file's link target should not match for %s:%s"%(
- pkg.name, filepath))
- self.assertFalse(cur_pflags.uid,
- "Regular file's uid should not match for %s:%s"%(
- pkg.name, filepath))
- self.assertFalse(cur_pflags.gid,
- "Regular file's guid should not match for %s:%s"%(
- pkg.name, filepath))
- self.assertFalse(cur_pflags.mtime,
- "Regular file's mtime should not match for %s:%s"%(
- pkg.name, filepath))
-
- @common.mark_dangerous
- def test_get_instance(self):
- """
- Tests GetInstance call.
- """
- for pkg in self.pkgdb:
- files = self.pkg_files[pkg.name]
- if ( common.is_installed(pkg)
- and not common.verify_pkg(pkg.name)):
- common.remove_pkg(pkg.name)
- if not common.is_installed(pkg.name):
- self.install_pkg(pkg)
- self.assertTrue(common.is_installed(pkg),
- "Package %s must be installed"%pkg)
-
- for filepath in files:
- objpath = self.make_op(pkg, filepath)
-
- inst = self.conn.GetInstance(
- InstanceName=objpath,
- LocalOnly=False)
- self.assertIsInstance(inst, pywbem.CIMInstance)
- self.assertEqual(objpath, inst.path,
- msg="Object paths of instance must match for %s:%s"%(
- pkg.name, filepath))
- for key in self.KEYS:
- if key.lower() == "softwareelementid":
- self.assertEqualSEID(inst[key], objpath[key],
- "OP key %s values should match for %s:%s"%(
- key, pkg.name, filepath))
- elif key.lower() == "targetoperatingsystem":
- self.assertIsInstance(objpath[key], (int, long))
- else:
- self.assertEqual(objpath[key], inst[key],
- "OP key %s values should match for %s:%s"%(
- key, pkg.name, filepath))
-
- self.assertTrue(inst["FileExists"],
- "File %s:%s must exist"%(pkg.name, filepath))
- self.assertEqual(10, len(inst["PassedFlags"]),
- "PassedFlags must have constant length")
- for i, flag in enumerate(inst["PassedFlags"]):
- self.assertTrue(flag is True,
- "Flag \"%s\" should all match for file %s:%s"%(
- inst["PassedFlagsDescriptions"][i], pkg.name, filepath))
- for prop in ( "FileType", "FileUserID", "FileGroupID"
- , "FileMode", "FileSize", "LinkTarget"
- , "FileChecksum", "FileModeFlags"):
- if ( ( os.path.islink(filepath)
- or (not os.path.isfile(filepath)))
- and prop == "FileSize"):
- continue
- self.assertEqual(inst["Expected"+prop], inst[prop],
- "%s should match for %s:%s"%(prop, pkg.name, filepath))
- if os.path.islink(filepath):
- self.do_check_symlink(pkg, filepath, inst)
- elif os.path.isdir(filepath):
- self.do_check_directory(pkg, filepath, inst)
- elif os.path.isfile(filepath):
- self.do_check_file(pkg, filepath, inst)
-
- @common.mark_dangerous
- def test_invoke_method(self):
- """
- Tests Invoke method invocation.
- """
- for pkg in self.pkgdb:
- files = self.pkg_files[pkg.name]
- if common.is_installed(pkg) and not common.verify_pkg(pkg.name):
- common.remove_pkg(pkg.name)
- if not common.is_installed(pkg.name):
- self.install_pkg(pkg)
- self.assertTrue(common.is_installed(pkg),
- "Package %s must be installed"%pkg)
- for filepath in files:
- objpath = self.make_op(pkg, filepath)
-
- (rval, _) = self.conn.InvokeMethod(
- MethodName="Invoke",
- ObjectName=objpath)
- self.assertEqual(pywbem.Uint32(0), rval,
- msg="InvokeMethod should be successful for %s:%s"%(
- pkg.name, filepath))
-
- # modify file
- if os.path.isfile(filepath):
- os.remove(filepath)
- else:
- os.lchown(filepath, os.stat(filepath).st_uid + 1, -1)
- (rval, _) = self.conn.InvokeMethod(
- MethodName="Invoke",
- ObjectName=objpath)
- self.assertEqual(pywbem.Uint32(2), rval,
- "InvokeMethod should not pass for modified file %s:%s"%(
- pkg.name, filepath))
-
-def suite():
- """For unittest loaders."""
- return unittest.TestLoader().loadTestsFromTestCase(
- TestSoftwareFileCheck)
-
-if __name__ == "__main__":
- unittest.main()
diff --git a/src/software/test/test_software_identity.py b/src/software/test/test_software_identity.py
new file mode 100755
index 0000000..934d4c9
--- /dev/null
+++ b/src/software/test/test_software_identity.py
@@ -0,0 +1,154 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Michal Minar <miminar@redhat.com>
+#
+"""
+Unit tests for LMI_SoftwareIdentity provider.
+"""
+
+from datetime import datetime, timedelta
+import itertools
+import pywbem
+import unittest
+
+import base
+import rpmcache
+import util
+
+class TestSoftwareIdentity(base.SoftwareBaseTestCase): #pylint: disable=R0904
+ """
+ Basic cim operations test.
+ """
+
+ CLASS_NAME = "LMI_SoftwareIdentity"
+ KEYS = ("InstanceID", )
+
+ def make_op(self, pkg, newer=True):
+ """
+ @param ses SoftwareElementState property value
+ @return object path of SoftwareIdentity
+ """
+ objpath = self.objpath.copy()
+ objpath["InstanceID"] = 'LMI:PKG:'+pkg.get_nevra(newer, "ALWAYS")
+ return objpath
+
+ @base.mark_dangerous
+ def test_get_instance(self):
+ """
+ Dangerous test, making sure, that properties are set correctly
+ for installed and removed packages.
+ """
+ for pkg in self.dangerous_pkgs:
+ if rpmcache.is_pkg_installed(pkg.name):
+ rpmcache.remove_pkg(pkg.name)
+ objpath = self.make_op(pkg)
+ inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False)
+ self.assertIsNone(inst["InstallDate"])
+ time_stamp = datetime.now(pywbem.cim_types.MinutesFromUTC(0)) \
+ - timedelta(seconds=0.01)
+ rpmcache.install_pkg(pkg)
+ inst2 = self.conn.GetInstance(
+ InstanceName=objpath, LocalOnly=False)
+ self.assertIsNotNone(inst2["InstallDate"])
+ self.assertGreater(inst2["InstallDate"].datetime, time_stamp)
+
+ def test_get_instance_safe(self):
+ """
+ Tests GetInstance call on packages from our rpm cache.
+ """
+ for pkg in self.safe_pkgs:
+ objpath = self.make_op(pkg)
+ inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False)
+ self.assertEqual(inst.path, objpath,
+ "Object paths should match for package %s"%pkg)
+ for key in self.KEYS:
+ self.assertTrue(inst.properties.has_key(key),
+ "OP is missing \"%s\" key for package %s"%(key, pkg))
+ self.assertIsInstance(inst["Caption"], basestring)
+ self.assertIsInstance(inst["Description"], basestring)
+ self.assertEqual(pkg.up_evra, inst["VersionString"],
+ "VersionString does not match evra for pkg %s" % pkg)
+ self.assertTrue(inst['IsEntity'])
+ self.assertEqual(pkg.name, inst["Name"],
+ "Name does not match for pkg %s" % pkg)
+ self.assertIsInstance(inst["Epoch"], pywbem.Uint32,
+ "Epoch does not match for pkg %s" % pkg)
+ self.assertEqual(int(pkg.epoch), inst["Epoch"])
+ self.assertEqual(pkg.ver, inst["Version"],
+ "Version does not match for pkg %s" % pkg)
+ self.assertEqual(pkg.rel, inst["Release"],
+ "Release does not match for pkg %s" % pkg)
+ self.assertEqual(pkg.arch, inst["Architecture"],
+ "Architecture does not match for pkg %s" % pkg)
+
+ # try to leave out epoch part
+ if pkg.epoch == "0":
+ op_no_epoch = objpath.copy()
+ op_no_epoch["SoftwareElementID"] = util.make_nevra(pkg.name,
+ pkg.up_epoch, pkg.up_ver, pkg.up_rel, pkg.arch, "NEVER")
+ self.assertNotIn(":", op_no_epoch["SoftwareElementID"])
+ inst = self.conn.GetInstance(
+ InstanceName=op_no_epoch, LocalOnly=False)
+ self.assertIn(inst.path, (objpath, op_no_epoch))
+
+ @base.mark_tedious
+ def test_enum_instance_names_safe(self):
+ """
+ Tests EnumInstanceNames call on installed packages.
+ """
+ inames = self.conn.EnumerateInstanceNames(ClassName=self.CLASS_NAME)
+ self.assertGreater(len(inames), 0)
+ for iname in inames:
+ self.assertIsInstance(iname, pywbem.CIMInstanceName)
+ self.assertEqual(iname.namespace, 'root/cimv2')
+ self.assertEqual(sorted(iname.keys()), sorted(self.KEYS))
+ nevra_set = set(i["InstanceID"] for i in inames)
+ for pkg in self.safe_pkgs:
+ self.assertIn('LMI:PKG:'+pkg.get_nevra(with_epoch="ALWAYS"),
+ nevra_set)
+
+# @base.mark_tedious
+# def test_enum_instances(self):
+# """
+# Tests EnumInstances call on installed packages.
+# """
+# insts = self.conn.EnumerateInstances(ClassName=self.CLASS_NAME)
+# self.assertGreater(len(insts), 0)
+# for inst in insts:
+# self.assertIsInstance(inst, pywbem.CIMInstance)
+# self.assertEqual(inst.namespace, 'root/cimv2')
+# self.assertEqual(sorted(inst.keys()), sorted(self.KEYS))
+# self.assertEqual(inst["InstanceID"], inst.path["InstanceID"])
+# nevra_set = set()
+# name_set = set()
+# for inst in insts:
+# nevra_set.add(inst["InstanceID"])
+# name_set.add(inst["Name"])
+# for pkg in self.safe_pkgs:
+# self.assertIn("LMI:PKG:"+pkg.get_nevra(with_epoch="ALWAYS"),
+# nevra_set)
+# self.assertIn(pkg.name, name_set)
+
+def suite():
+ """For unittest loaders."""
+ return unittest.TestLoader().loadTestsFromTestCase(
+ TestSoftwareIdentity)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/src/software/test/test_software_installed_package.py b/src/software/test/test_software_installed_package.py
deleted file mode 100755
index 93669f9..0000000
--- a/src/software/test/test_software_installed_package.py
+++ /dev/null
@@ -1,351 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-#
-# Authors: Michal Minar <miminar@redhat.com>
-#
-"""
-Unit tests for LMI_SoftwareInstalledPackage provider.
-"""
-
-import os
-import pywbem
-import socket
-import stat
-import unittest
-
-import common
-
-def make_path_tuple(objpath):
- return tuple(objpath[a] for a in ("SoftwareElementID", "Name", "Version",
- "SoftwareElementState"))
-
-class TestSoftwareInstalledPackage(common.SoftwareBaseTestCase):
- """
- Basic cim operations test.
- """
-
- CLASS_NAME = "LMI_SoftwareInstalledPackage"
- KEYS = ("Software", "System")
-
- def make_op(self, pkg, newer=True, ses=2):
- """
- @param ses SoftwareElementState property value
- @return object path of LMI_SoftwareInstaledPackage
- """
- objpath = self.objpath.copy()
- pkg_op = pywbem.CIMInstanceName(
- classname="LMI_SoftwarePackage", namespace="root/cimv2",
- keybindings={
- "Name" : pkg.name,
- "SoftwareElementID" : pkg.get_nevra(newer, 'ALWAYS'),
- "SoftwareElementState" : pywbem.Uint16(ses),
- "TargetOperatingSystem" : pywbem.Uint16(36),
- "Version" : getattr(pkg, 'up_ver' if newer else 'ver') })
- system_op = pywbem.CIMInstanceName(
- classname="Linux_ComputerSystem", namespace="root/cimv2",
- keybindings={
- "CreationClassName" : "Linux_ComputerSystem",
- "Name" : socket.gethostname() })
- objpath["Software"] = pkg_op
- objpath["System"] = system_op
- return objpath
-
- @common.mark_dangerous
- def test_get_instance(self):
- """
- Tests GetInstance call on packages from our rpm cache.
- TODO: test this in non-dangerous way
- """
- for pkg in self.pkgdb:
- if not common.is_installed(pkg):
- self.install_pkg(pkg)
- objpath = self.make_op(pkg)
- inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False)
- self.assertIsSubclass(inst.path.classname, self.CLASS_NAME)
- self.assertIsSubclass(
- inst.path['System'].classname, objpath['System'].classname)
- self.assertIsSubclass(inst.path['Software'].classname,
- objpath['Software'].classname)
- self.assertEqual(2, len(inst.path.keys()))
- for key in self.KEYS:
- self.assertEqual(inst[key], inst.path[key],
- "Object paths should be the same for %s"%pkg)
- self.assertEqual(inst['Software'], objpath['Software'],
- "Software key reference should match for %s"%pkg)
- common.remove_pkg(pkg.name)
- objpath['Software']['SoftwareElementState'] = pywbem.Uint16(1)
- with self.assertRaises(pywbem.CIMError) as cmngr:
- self.conn.GetInstance(InstanceName=objpath, LocalOnly=False)
- self.assertEqual(pywbem.CIM_ERR_NOT_FOUND, cmngr.exception.args[0],
- "Package %s should not be installed"%pkg)
-
- @common.mark_dangerous
- def test_enum_instance_names(self):
- """
- Tests EnumInstances call.
- TODO: test this in non-dangerous way
- """
- pkg = self.pkgdb[0] if len(self.pkgdb) > 0 else None
- if pkg and common.is_installed(pkg.name):
- common.remove_pkg(pkg.name)
- insts1 = self.conn.EnumerateInstanceNames(ClassName=self.CLASS_NAME)
- if pkg:
- self.install_pkg(pkg)
- insts2 = self.conn.EnumerateInstanceNames(ClassName=self.CLASS_NAME)
- self.assertEqual(len(insts1) + 1, len(insts2))
-
- if pkg:
- objpath = self.make_op(pkg)
- self.assertIn(make_path_tuple(objpath['Software']),
- set(make_path_tuple(inst['Software']) for inst in insts2))
- self.assertTrue(all(isinstance(inst, pywbem.CIMInstanceName)
- for inst in insts1))
-
- @common.mark_dangerous
- def test_enum_instances(self):
- """
- Tests EnumInstances call.
- """
- pkg = self.pkgdb[0] if len(self.pkgdb) > 0 else None
- if pkg and not common.is_installed(pkg.name):
- self.install_pkg(pkg.name)
- insts1 = self.conn.EnumerateInstances(ClassName=self.CLASS_NAME)
- if pkg:
- common.remove_pkg(pkg.name)
- insts2 = self.conn.EnumerateInstances(ClassName=self.CLASS_NAME)
- objpath = self.make_op(pkg)
- self.assertEqual(len(insts2) + 1, len(insts1))
- path_values = set(make_path_tuple(p["Software"]) for p in insts1)
- self.assertIn(make_path_tuple(objpath['Software']), path_values)
- path_values = set(make_path_tuple(p["Software"]) for p in insts2)
- self.assertNotIn(make_path_tuple(objpath['Software']), path_values)
-
- self.assertTrue(all(inst['Software'] == inst.path['Software']
- for inst in insts1))
- self.assertTrue(all(inst['System'] == inst.path['System']
- for inst in insts1))
-
- @common.mark_dangerous
- def test_create_instance(self):
- """
- Tests CreateInstance call.
- """
- for pkg in self.pkgdb:
- if common.is_installed(pkg.name):
- common.remove_pkg(pkg.name)
- self.assertFalse(common.is_installed(pkg.name),
- "Package %s should not be installed"%pkg)
- objpath = self.make_op(pkg, ses=1)
- inst = pywbem.CIMInstance(classname=objpath.classname, path=objpath)
- inst["Software"] = objpath["Software"]
- inst["System"] = objpath["System"]
- iname = self.conn.CreateInstance(NewInstance=inst)
- self.assertIsInstance(iname, pywbem.CIMInstanceName)
- objpath["Software"]["SoftwareElementState"] = pywbem.Uint16(2)
- self.assertEqual(iname["Software"], objpath["Software"],
- "Software key reference should match for %s"%pkg)
- self.assertIsInstance(iname["System"], pywbem.CIMInstanceName)
- self.assertIsSubclass(iname["System"].classname,
- objpath["System"].classname)
- self.assertEqual(set(iname.keys()), set(objpath.keys()),
- "Keys of object paths should be the same for %s"%pkg)
- self.assertTrue(common.is_installed(pkg.name),
- "Package %s should be installed"%pkg)
-
- with self.assertRaises(pywbem.CIMError) as cmngr:
- self.conn.CreateInstance(NewInstance=inst)
- self.assertEqual(pywbem.CIM_ERR_ALREADY_EXISTS,
- cmngr.exception.args[0],
- "Package %s should already be installed"%pkg)
-
- @common.mark_dangerous
- def test_delete_instance(self):
- """
- Tests DeleteInstance call.
- """
- for pkg in self.pkgdb:
- if not common.is_installed(pkg.name):
- self.install_pkg(pkg)
- self.assertTrue(common.is_installed(pkg.name),
- "Package %s must be installed"%pkg)
- objpath = self.make_op(pkg)
- self.conn.DeleteInstance(objpath)
- self.assertFalse(common.is_installed(pkg.name),
- "Package %s must be uninstalled"%pkg)
- with self.assertRaises(pywbem.CIMError) as cmngr:
- self.conn.DeleteInstance(objpath)
- self.assertIn(cmngr.exception.args[0],
- [pywbem.CIM_ERR_FAILED, pywbem.CIM_ERR_NOT_FOUND],
- "Package %s can not be uninstalled again"%pkg)
-
- @common.mark_dangerous
- def test_check_integrity(self):
- """
- Tests CheckIntegrity call.
- TODO: test this in non-dangerous way
- """
- for pkg in self.pkgdb:
- files = self.pkg_files[pkg.name]
- if ( ( common.is_installed(pkg)
- and not common.verify_pkg(pkg.name))
- or ( common.is_installed(pkg.name)
- and not common.is_installed(pkg))) :
- common.remove_pkg(pkg.name)
- if not common.is_installed(pkg.name):
- self.install_pkg(pkg)
- self.assertTrue(common.is_installed(pkg),
- "Package %s must be installed"%pkg)
-
- objpath = self.make_op(pkg)
- (rval, oparms) = self.conn.InvokeMethod(
- MethodName="CheckIntegrity",
- ObjectName=objpath)
- self.assertEqual(pywbem.Uint32(0), rval,
- "IntegrityCheck should pass for %s"%pkg.name) # check passed
- self.assertEqual(1, len(oparms))
- self.assertIn("Failed", oparms)
- self.assertEqual(0, len(oparms["Failed"]),
- "IntegrityCheck should not fail for %s"%pkg.name)
-
- cnt_bad = 0
- for file_path in files:
- stats = os.lstat(file_path)
- if os.path.islink(file_path): # modify symbolic link
- target = os.readlink(file_path)
- os.remove(file_path)
- os.symlink(target, file_path) # just touch symlink
- (rval, oparms) = self.conn.InvokeMethod(
- MethodName="CheckIntegrity",
- ObjectName=objpath)
- # symlink must pass
- self.assertEqual(cnt_bad, len(oparms["Failed"]),
- "Symlink %s:%s should pass"%(pkg.name, file_path))
- os.remove(file_path)
- # now change target
- os.symlink("wrong_link_target", file_path)
- elif os.path.isdir(file_path): # check directory
- os.chmod(file_path, stats.st_mode) # just touch dir
- (rval, oparms) = self.conn.InvokeMethod(
- MethodName="CheckIntegrity",
- ObjectName=objpath)
- # dir must pass
- self.assertEqual(cnt_bad, len(oparms["Failed"]),
- "Directory %s:%s should pass"%(pkg.name, file_path))
- # modify read access of directory
- os.chmod(file_path, stats.st_mode ^ stat.S_IROTH)
- else: # modify regular file
- # just touch file - this is enough to make it fail
- with open(file_path, "w+"):
- pass
- cnt_bad += 1
- (rval, oparms) = self.conn.InvokeMethod(
- MethodName="CheckIntegrity",
- ObjectName=objpath)
- self.assertEqual(pywbem.Uint32(1), rval,
- "File %s:%s should not pass"%(pkg.name, file_path))
- self.assertEqual(1, len(oparms))
- self.assertIn("Failed", oparms)
- self.assertEqual(len(oparms["Failed"]), cnt_bad,
- "Number of errors not correct. Failed for %s:%s" % (
- pkg.name, file_path))
- self.assertIn(file_path, (p["Name"] for p in oparms["Failed"]),
- "File %s:%s should also be in failed"%(pkg.name, file_path))
-
- @common.mark_dangerous
- def test_method_update(self):
- """
- Tests Update method invocation.
- """
- for pkg in self.pkgdb:
- if ( common.is_installed(pkg.name)
- and not common.is_installed(pkg, False)):
- common.remove_pkg(pkg.name)
- if not common.is_installed(pkg.name):
- self.install_pkg(pkg, False)
- self.assertTrue(common.is_installed(pkg, False),
- "Package %s must be installed"%pkg.get_nevra(False))
-
- objpath = self.make_op(pkg, False)
- op_up = self.make_op(pkg)
- (rval, oparms) = self.conn.InvokeMethod(
- MethodName="Update",
- ObjectName=objpath)
- self.assertEqual(pywbem.Uint16(1), rval,
- "Update should succed for %s"%pkg.get_nevra(False))
- self.assertEqual(1, len(oparms))
- self.assertIn("Installed", oparms)
- self.assertEqual(oparms["Installed"], op_up["Software"],
- "Object paths should match for %s"%pkg)
- self.assertTrue(common.is_installed(pkg),
- "Package %s must be installed"%pkg)
-
- (rval, oparms) = self.conn.InvokeMethod(
- MethodName="Update",
- ObjectName=op_up)
- self.assertEqual(pywbem.Uint16(0), rval,
- "Package %s should be already updated"%pkg)
- self.assertEqual(1, len(oparms))
- self.assertEqual(oparms["Installed"], op_up["Software"],
- "Object paths should match for %s"%pkg)
- self.assertTrue(common.is_installed(pkg.name),
- "Package %s must be installed"%pkg)
- self.assertFalse(common.is_installed(pkg, False),
- "Older package %s must not be installed" %
- pkg.get_nevra(False))
-
- with self.assertRaises(pywbem.CIMError) as cmngr:
- self.conn.InvokeMethod(
- MethodName="Update",
- ObjectName=objpath)
- self.assertEqual(pywbem.CIM_ERR_NOT_FOUND, cmngr.exception.args[0],
- "Older package %s should not be installed" %
- pkg.get_nevra(False))
-
- common.remove_pkg(pkg.name)
- self.install_pkg(pkg, False)
- self.assertTrue(common.is_installed(pkg, False))
-
- (rval, oparms) = self.conn.InvokeMethod(
- MethodName="Update",
- ObjectName=objpath,
- Epoch=pkg.epoch,
- Version=pkg.ver,
- Release=pkg.rel)
- self.assertEqual(pywbem.Uint16(0), rval,
- "Update of package %s should succeed"%pkg)
- self.assertEqual(oparms["Installed"], objpath["Software"],
- "Object paths should be the same for package %s"%pkg)
-
- (rval, oparms) = self.conn.InvokeMethod(
- MethodName="Update",
- ObjectName=objpath,
- Epoch=pkg.up_epoch,
- Version=pkg.up_ver,
- Release=pkg.up_rel)
- self.assertEqual(pywbem.Uint16(1), rval,
- "Package %s can not be updated twice to highest version"%pkg)
- self.assertEqual(oparms["Installed"], op_up["Software"],
- "Object paths should match for package %s"%pkg)
-
-def suite():
- """For unittest loaders."""
- return unittest.TestLoader().loadTestsFromTestCase(
- TestSoftwareInstalledPackage)
-
-if __name__ == "__main__":
- unittest.main()
diff --git a/src/software/test/test_software_package.py b/src/software/test/test_software_package.py
deleted file mode 100755
index 4cd5da9..0000000
--- a/src/software/test/test_software_package.py
+++ /dev/null
@@ -1,153 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-#
-# Authors: Michal Minar <miminar@redhat.com>
-#
-"""
-Unit tests for LMI_SoftwareInstalledPackage provider.
-"""
-
-import pywbem
-import unittest
-
-import common
-import rpmcache
-
-class TestSoftwarePackage(common.SoftwareBaseTestCase): #pylint: disable=R0904
- """
- Basic cim operations test.
- """
-
- CLASS_NAME = "LMI_SoftwarePackage"
- KEYS = ( "Name", "SoftwareElementID", "SoftwareElementState"
- , "TargetOperatingSystem", "Version")
-
- def make_op(self, pkg, newer=True, ses=2):
- """
- @param ses SoftwareElementState property value
- @return object path of SoftwarePackage
- """
- objpath = self.objpath.copy()
- objpath["Name"] = pkg.name
- objpath["SoftwareElementID"] = pkg.get_nevra(newer, "ALWAYS")
- objpath["SoftwareElementState"] = pywbem.Uint16(ses)
- objpath["TargetOperatingSystem"] = pywbem.Uint16(36)
- objpath["Version"] = getattr(pkg, 'up_ver' if newer else 'ver')
- return objpath
-
- @common.mark_dangerous
- def test_get_instance(self):
- """
- Tests GetInstance call on packages from our rpm cache.
- TODO: test this in non-dangerous way
- """
- for pkg in self.pkgdb:
- if common.is_installed(pkg.name):
- common.remove_pkg(pkg.name)
- objpath = self.make_op(pkg, ses=1)
- inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False)
- self.assertEqual(inst.path, objpath,
- "Object paths should match for %s"%pkg)
- for key in self.KEYS:
- self.assertTrue(inst.properties.has_key(key),
- "OP is missing \"%s\" key for package %s"%(key, pkg))
- if key == "TargetOperatingSystem":
- self.assertIsInstance(inst.path[key], (int, long))
- else:
- self.assertEqual(inst.path[key], inst[key],
- "Object paths of instance should match for %s"%pkg)
- self.assertEqual(pkg.up_rel, inst['Release'],
- "Release property should match for %s"%pkg)
- self.install_pkg(pkg)
- objpath['SoftwareElementState'] = pywbem.Uint16(2)
- inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False)
- self.assertEqual(inst.path, objpath,
- "Object paths should match for %s"%pkg)
-
- # try to leave out epoch part
- if pkg.epoch == "0":
- op_no_epoch = objpath.copy()
- op_no_epoch["SoftwareElementID"] = rpmcache.make_nevra(pkg.name,
- pkg.up_epoch, pkg.up_ver, pkg.up_rel, pkg.arch, "NEVER")
- self.assertNotIn(":", op_no_epoch["SoftwareElementID"])
- inst = self.conn.GetInstance(
- InstanceName=op_no_epoch, LocalOnly=False)
- self.assertIn(inst.path, (objpath, op_no_epoch))
-
- @common.mark_dangerous
- def test_method_install(self):
- """
- Tests Install method invocation.
- """
- for pkg in self.pkgdb:
- if common.is_installed(pkg.name):
- common.remove_pkg(pkg.name)
- objpath = self.make_op(pkg, ses=1)
- (rval, oparms) = self.conn.InvokeMethod(
- MethodName="Install",
- ObjectName=objpath)
- self.assertEqual(pywbem.Uint32(1), rval,
- "Installation of %s should be successful"%pkg)
- self.assertEqual(1, len(oparms))
- self.assertTrue(oparms.has_key('Installed'))
- objpath['SoftwareElementState'] = pywbem.Uint16(2)
- self.assertEqual(oparms['Installed'], objpath,
- "Object paths should match for %s"%pkg)
- self.assertTrue(common.is_installed(pkg.name),
- "Package %s must be installed"%pkg)
- (rval, oparms) = self.conn.InvokeMethod(
- MethodName="Install",
- ObjectName=objpath)
- self.assertEqual(pywbem.Uint32(0), rval,
- "Installation of %s should fail"%pkg)
- self.assertEqual(len(oparms), 1)
- self.assertEqual(oparms['Installed'], objpath,
- "Object paths should match for %s"%pkg)
-
- @common.mark_dangerous
- def test_method_remove(self):
- """
- Tests Remove method invocation.
- """
- for pkg in self.pkgdb:
- if not common.is_installed(pkg.name):
- self.install_pkg(pkg)
- objpath = self.make_op(pkg)
- (rval, oparms) = self.conn.InvokeMethod(
- MethodName="Remove",
- ObjectName=objpath)
- self.assertEqual(pywbem.Uint16(1), rval,
- "Removal of package should succeed"%pkg)
- self.assertEqual(0, len(oparms))
- self.assertFalse(common.is_installed(pkg.name),
- "Package %s should not be installed"%pkg)
- objpath["SoftwareElementState"] = pywbem.Uint16(1)
- (rval, oparms) = self.conn.InvokeMethod(
- MethodName="Remove",
- ObjectName=objpath)
- self.assertEqual(pywbem.Uint32(0), rval,
- "Removal of %s should fail"%pkg)
- self.assertEqual(len(oparms), 0)
-
-def suite():
- """For unittest loaders."""
- return unittest.TestLoader().loadTestsFromTestCase(
- TestSoftwarePackage)
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/src/software/test/test_system_software_collection.py b/src/software/test/test_system_software_collection.py
new file mode 100755
index 0000000..193a4d5
--- /dev/null
+++ b/src/software/test/test_system_software_collection.py
@@ -0,0 +1,86 @@
+#!/usr/bin/env python
+#
+# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Michal Minar <miminar@redhat.com>
+#
+"""
+Unit tests for LMI_SoftwareIdentity provider.
+"""
+
+import unittest
+
+import base
+
+class TestSystemSoftwareCollection(
+ base.SoftwareBaseTestCase): #pylint: disable=R0904
+ """
+ Basic cim operations test.
+ """
+
+ CLASS_NAME = "LMI_SystemSoftwareCollection"
+ KEYS = ("InstanceID", )
+
+ def make_op(self):
+ """
+ @param ses SoftwareElementState property value
+ @return object path of SoftwareIdentity
+ """
+ objpath = self.objpath.copy()
+ objpath["InstanceID"] = "LMI:SystemSoftwareCollection"
+ return objpath
+
+ def test_get_instance(self):
+ """
+ Tests GetInstance call on packages from our rpm cache.
+ """
+ objpath = self.make_op()
+ inst = self.conn.GetInstance(InstanceName=objpath, LocalOnly=False)
+
+ self.assertEqual(objpath, inst.path)
+ self.assertEqual(list(objpath.keys()), list(inst.path.keys()))
+ for key in self.KEYS:
+ self.assertEqual(inst[key], objpath[key])
+ self.assertIsInstance(inst["Caption"], basestring)
+
+ def test_enum_instance_names(self):
+ """
+ Tests EnumInstanceNames call on installed packages.
+ """
+ inames = self.conn.EnumerateInstanceNames(ClassName=self.CLASS_NAME)
+ self.assertEqual(len(inames), 1)
+ iname = inames[0]
+ self.assertEqual(iname, self.make_op())
+
+ def test_enum_instances(self):
+ """
+ Tests EnumInstances call on installed packages.
+ """
+ insts = self.conn.EnumerateInstances(ClassName=self.CLASS_NAME)
+ self.assertEqual(len(insts), 1)
+ inst = insts[0]
+ self.assertEqual(inst.path, self.make_op())
+ self.assertEqual(inst["InstanceID"], inst.path["InstanceID"])
+ self.assertIsInstance(inst["Caption"], basestring)
+
+def suite():
+ """For unittest loaders."""
+ return unittest.TestLoader().loadTestsFromTestCase(
+ TestSystemSoftwareCollection)
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/src/software/test/util.py b/src/software/test/util.py
new file mode 100644
index 0000000..da55e6a
--- /dev/null
+++ b/src/software/test/util.py
@@ -0,0 +1,82 @@
+#!/usr/bin/python
+# -*- Coding:utf-8 -*-
+#
+# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details. #
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Radek Novacek <rnovacek@redhat.com>
+# Authors: Michal Minar <miminar@redhat.com>
+
+"""
+Common test utilities.
+"""
+
+import re
+from subprocess import check_output
+
+RE_NEVRA = re.compile(
+ r'^(?P<name>.+)-(?P<evra>(?P<epoch>\d+):(?P<ver>[^-]+)'
+ r'-(?P<rel>.+)\.(?P<arch>[^.]+))$')
+RE_NEVRA_OPT_EPOCH = re.compile(
+ r'^(?P<name>.+)-(?P<evra>((?P<epoch>\d+):)?(?P<ver>[^-]+)'
+ r'-(?P<rel>.+)\.(?P<arch>[^.]+))$')
+RE_ENVRA = re.compile(
+ r'^(?P<epoch>\d+|\(none\)):(?P<name>.+)-(?P<ver>[^-]+)'
+ r'-(?P<rel>.+)\.(?P<arch>[^.]+)$')
+
+def make_nevra(name, epoch, ver, rel, arch, with_epoch='NOT_ZERO'):
+ """
+ @param with_epoch may be one of:
+ "NOT_ZERO" - include epoch only if it's not zero
+ "ALWAYS" - include epoch always
+ "NEVER" - do not include epoch at all
+ """
+ estr = ''
+ if with_epoch.lower() == "always":
+ estr = epoch
+ elif with_epoch.lower() == "not_zero":
+ if epoch and epoch.lower() not in {"0", "(none)"}:
+ estr = epoch
+ if len(estr):
+ estr += ":"
+ return "%s-%s%s-%s.%s" % (name, estr, ver, rel, arch)
+
+def make_evra(epoch, ver, rel, arch):
+ """ @return evra string """
+ if not epoch or epoch.lower() == "(none)":
+ epoch = "0"
+ return "%s:%s-%s.%s" % (epoch, ver, rel, arch)
+
+def run_yum(*params, **kwargs):
+ """
+ Runs yum with params and returns its output
+ It's here especially to allow pass a repolist argument, that
+ specifies list of repositories, to run the command on.
+ """
+ cmd = ['yum'] + list(params)
+ repolist = kwargs.get('repolist', None)
+ if repolist is None:
+ repolist = []
+ if repolist:
+ cmd += ['--disablerepo=*']
+ cmd += ['--enablerepo='+r for r in repolist]
+ return check_output(cmd)
+
+def get_system_architecture():
+ """
+ @return the system architecture name as seen by rpm
+ """
+ return check_output(['rpm', '-q', '--qf', '%{ARCH}\n', 'rpm'])
+