diff options
Diffstat (limited to 'src/software')
-rw-r--r-- | src/software/openlmi/software/LMI_InstalledSoftwareIdentity.py | 309 | ||||
-rw-r--r-- | src/software/openlmi/software/cimom_entry.py | 5 | ||||
-rw-r--r-- | src/software/test/test_installed_software_identity.py | 255 |
3 files changed, 568 insertions, 1 deletions
diff --git a/src/software/openlmi/software/LMI_InstalledSoftwareIdentity.py b/src/software/openlmi/software/LMI_InstalledSoftwareIdentity.py new file mode 100644 index 0000000..58c6518 --- /dev/null +++ b/src/software/openlmi/software/LMI_InstalledSoftwareIdentity.py @@ -0,0 +1,309 @@ +# Software Management Providers +# +# Copyright (C) 2012 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Michal Minar <miminar@redhat.com> +# + +"""Python Provider for LMI_InstalledSoftwareIdentity + +Instruments the CIM class LMI_InstalledSoftwareIdentity + +""" + +import pywbem +from pywbem.cim_provider2 import CIMProvider2 + +from openlmi.common import cmpi_logging +from openlmi.software.core import ComputerSystem +from openlmi.software.core import SoftwareIdentity +from openlmi.software.yumdb import YumDB + +class LMI_InstalledSoftwareIdentity(CIMProvider2): + """Instrument the CIM class LMI_InstalledSoftwareIdentity + + The InstalledSoftwareIdentity association identifies the System on + which a SoftwareIdentity is installed. This class is a corollary to + InstalledSoftwareElement, but deals with the asset aspects of software + (as indicated by SoftwareIdentity), versus the deployment aspects (as + indicated by SoftwareElement). + + """ + + def __init__ (self, _env): + cmpi_logging.logger.debug('Initializing provider %s from %s' \ + % (self.__class__.__name__, __file__)) + + @cmpi_logging.trace_method + def get_instance(self, env, model): + """Return an instance. + + Keyword arguments: + env -- Provider Environment (pycimmb.ProviderEnvironment) + model -- A template of the pywbem.CIMInstance to be returned. The + key properties are set on this instance to correspond to the + instanceName that was requested. The properties of the model + are already filtered according to the PropertyList from the + request. Only properties present in the model need to be + given values. If you prefer, you can set all of the + values, and the instance will be filtered for you. + + Possible Errors: + CIM_ERR_ACCESS_DENIED + CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized + or otherwise incorrect parameters) + CIM_ERR_NOT_FOUND (the CIM Class does exist, but the requested CIM + Instance does not exist in the specified namespace) + CIM_ERR_FAILED (some other unspecified error occurred) + + """ + ComputerSystem.check_path_property(env, model, 'System') + + if not isinstance(model['InstalledSoftware'], pywbem.CIMInstanceName): + raise pywbem.CIMError(pywbem.CIM_ERR_INVALID_PARAMETER, + "Expected object path for InstalledSoftware!") + + model["System"] = model.path["System"] = ComputerSystem.get_path() + with YumDB.get_instance(): + pkg_info = SoftwareIdentity.object_path2pkg( + model['InstalledSoftware'], kind='installed') + model['InstalledSoftware'] = SoftwareIdentity.pkg2model(pkg_info) + return model + + @cmpi_logging.trace_method + def enum_instances(self, env, model, keys_only): + """Enumerate instances. + + The WBEM operations EnumerateInstances and EnumerateInstanceNames + are both mapped to this method. + This method is a python generator + + Keyword arguments: + env -- Provider Environment (pycimmb.ProviderEnvironment) + model -- A template of the pywbem.CIMInstances to be generated. + The properties of the model are already filtered according to + the PropertyList from the request. Only properties present in + the model need to be given values. If you prefer, you can + always set all of the values, and the instance will be filtered + for you. + keys_only -- A boolean. True if only the key properties should be + set on the generated instances. + + Possible Errors: + CIM_ERR_FAILED (some other unspecified error occurred) + """ + # Prime model.path with knowledge of the keys, so key values on + # the CIMInstanceName (model.path) will automatically be set when + # we set property values on the model. + model.path.update({'InstalledSoftware': None, 'System': None}) + + model['System'] = ComputerSystem.get_path() + inst_model = pywbem.CIMInstanceName( + classname="LMI_SoftwareIdentity", + namespace="root/cimv2") + with YumDB.get_instance() as yb: + pl = yb.get_package_list('installed', sort=True) + for pkg in pl: + model['InstalledSoftware'] = SoftwareIdentity.pkg2model( + pkg, model=inst_model) + yield model + + @cmpi_logging.trace_method + def set_instance(self, env, instance, modify_existing): + """Return a newly created or modified instance. + + Keyword arguments: + env -- Provider Environment (pycimmb.ProviderEnvironment) + instance -- The new pywbem.CIMInstance. If modifying an existing + instance, the properties on this instance have been filtered by + the PropertyList from the request. + modify_existing -- True if ModifyInstance, False if CreateInstance + + Return the new instance. The keys must be set on the new instance. + + Possible Errors: + CIM_ERR_ACCESS_DENIED + CIM_ERR_NOT_SUPPORTED + CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized + or otherwise incorrect parameters) + CIM_ERR_ALREADY_EXISTS (the CIM Instance already exists -- only + valid if modify_existing is False, indicating that the operation + was CreateInstance) + CIM_ERR_NOT_FOUND (the CIM Instance does not exist -- only valid + if modify_existing is True, indicating that the operation + was ModifyInstance) + CIM_ERR_FAILED (some other unspecified error occurred) + """ + + if modify_existing: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED, + "Installed package can not be modified.") + + if not "InstalledSoftware" in instance: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "Missing InstalledSoftware property.") + if not "System" in instance: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "Missing System property.") + + ComputerSystem.check_path_property(env, instance, 'System') + with YumDB.get_instance() as ydb: + pkg_info = SoftwareIdentity.object_path2pkg( + instance["InstalledSoftware"], kind="all") + if pkg_info.installed: + raise pywbem.CIMError(pywbem.CIM_ERR_ALREADY_EXISTS, + 'Package "%s" is already installed.' % pkg_info) + cmpi_logging.logger.info('installing package "%s"' % pkg_info) + installed = ydb.install_package(pkg_info) + cmpi_logging.logger.info('package "%s" installed' % pkg_info) + instance["InstalledSoftware"] = SoftwareIdentity.pkg2model(installed) + return instance + + @cmpi_logging.trace_method + def delete_instance(self, env, instance_name): + """Delete an instance. + + Keyword arguments: + env -- Provider Environment (pycimmb.ProviderEnvironment) + instance_name -- A pywbem.CIMInstanceName specifying the instance + to delete. + + Possible Errors: + CIM_ERR_ACCESS_DENIED + CIM_ERR_NOT_SUPPORTED + CIM_ERR_INVALID_NAMESPACE + CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized + or otherwise incorrect parameters) + CIM_ERR_INVALID_CLASS (the CIM Class does not exist in the specified + namespace) + CIM_ERR_NOT_FOUND (the CIM Class does exist, but the requested CIM + Instance does not exist in the specified namespace) + CIM_ERR_FAILED (some other unspecified error occurred) + """ + if not "InstalledSoftware" in instance_name: + raise pywbem.CIMError(pywbem.CIM_ERR_INVALID_PARAMETER, + "Missing InstalledSoftware property.") + if not "System" in instance_name: + raise pywbem.CIMError(pywbem.CIM_ERR_INVALID_PARAMETER, + "Missing System property.") + ComputerSystem.check_path_property(env, instance_name, 'System') + + with YumDB.get_instance() as ydb: + pkg_info = SoftwareIdentity.object_path2pkg( + instance_name["InstalledSoftware"], kind="installed") + cmpi_logging.logger.info('removing package "%s"' % pkg_info) + ydb.remove_package(pkg_info) + cmpi_logging.logger.info('package "%s" removed' % pkg_info) + + @cmpi_logging.trace_method + def references(self, env, object_name, model, result_class_name, role, + result_role, keys_only): + """Instrument Associations. + + All four association-related operations (Associators, AssociatorNames, + References, ReferenceNames) are mapped to this method. + This method is a python generator + + Keyword arguments: + env -- Provider Environment (pycimmb.ProviderEnvironment) + object_name -- A pywbem.CIMInstanceName that defines the source + CIM Object whose associated Objects are to be returned. + model -- A template pywbem.CIMInstance to serve as a model + of the objects to be returned. Only properties present on this + model need to be set. + result_class_name -- If not empty, this string acts as a filter on + the returned set of Instances by mandating that each returned + Instances MUST represent an association between object_name + and an Instance of a Class whose name matches this parameter + or a subclass. + role -- If not empty, MUST be a valid Property name. It acts as a + filter on the returned set of Instances by mandating that each + returned Instance MUST refer to object_name via a Property + whose name matches the value of this parameter. + result_role -- If not empty, MUST be a valid Property name. It acts + as a filter on the returned set of Instances by mandating that + each returned Instance MUST represent associations of + object_name to other Instances, where the other Instances play + the specified result_role in the association (i.e. the + name of the Property in the Association Class that refers to + the Object related to object_name MUST match the value of this + parameter). + keys_only -- A boolean. True if only the key properties should be + set on the generated instances. + + The following diagram may be helpful in understanding the role, + result_role, and result_class_name parameters. + +------------------------+ +-------------------+ + | object_name.classname | | result_class_name | + | ~~~~~~~~~~~~~~~~~~~~~ | | ~~~~~~~~~~~~~~~~~ | + +------------------------+ +-------------------+ + | +-----------------------------------+ | + | | [Association] model.classname | | + | object_name | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | | + +--------------+ object_name.classname REF role | | + (CIMInstanceName) | result_class_name REF result_role +------+ + | |(CIMInstanceName) + +-----------------------------------+ + + Possible Errors: + CIM_ERR_ACCESS_DENIED + CIM_ERR_NOT_SUPPORTED + CIM_ERR_INVALID_NAMESPACE + CIM_ERR_INVALID_PARAMETER (including missing, duplicate, unrecognized + or otherwise incorrect parameters) + CIM_ERR_FAILED (some other unspecified error occurred) + + """ + cimhandle = env.get_cimom_handle() + + model.path.update({'InstalledSoftware': None, 'System': None}) + + with YumDB.get_instance() as ydb: + try: + if ( (not role or role.lower() == 'system') + and cimhandle.is_subclass(object_name.namespace, + sub=object_name.classname, + super='CIM_ComputerSystem')): + ComputerSystem.check_path(env, object_name, "object_name") + + pkg_model = pywbem.CIMInstanceName( + classname='LMI_SoftwareIdentity', + namespace="root/cimv2", + host=model.path.host) + model["System"] = ComputerSystem.get_path() + for pkg_info in ydb.get_package_list('installed', + sort=True): + model["InstalledSoftware"] = \ + SoftwareIdentity.pkg2model( + pkg_info, model=pkg_model) + yield model + + if ( (not role or role.lower() == 'installedsoftware') + and cimhandle.is_subclass(object_name.namespace, + sub=object_name.classname, + super='LMI_SoftwareIdentity')): + pkg_info = SoftwareIdentity.object_path2pkg( + object_name, kind="installed") + model['InstalledSoftware'] = SoftwareIdentity.pkg2model( + pkg_info) + model["System"] = ComputerSystem.get_path() + yield model + + except pywbem.CIMError as exc: + if exc.args[0] != pywbem.CIM_ERR_NOT_FOUND: + raise + diff --git a/src/software/openlmi/software/cimom_entry.py b/src/software/openlmi/software/cimom_entry.py index 156f63a..3b2c921 100644 --- a/src/software/openlmi/software/cimom_entry.py +++ b/src/software/openlmi/software/cimom_entry.py @@ -32,6 +32,8 @@ from openlmi.software.LMI_HostedSoftwareCollection import \ LMI_HostedSoftwareCollection from openlmi.software.LMI_MemberOfSoftwareCollection import \ LMI_MemberOfSoftwareCollection +from openlmi.software.LMI_InstalledSoftwareIdentity import \ + LMI_InstalledSoftwareIdentity from openlmi.software.yumdb import YumDB def get_providers(env): @@ -44,7 +46,8 @@ def get_providers(env): "LMI_SoftwareIdentity" : LMI_SoftwareIdentity(env), "LMI_SystemSoftwareCollection" : LMI_SystemSoftwareCollection(env), "LMI_HostedSoftwareCollection" : LMI_HostedSoftwareCollection(env), - "LMI_MemberOfSoftwareCollection" : LMI_MemberOfSoftwareCollection(env) + "LMI_MemberOfSoftwareCollection" : LMI_MemberOfSoftwareCollection(env), + "LMI_InstalledSoftwareIdentity" : LMI_InstalledSoftwareIdentity(env) } return providers diff --git a/src/software/test/test_installed_software_identity.py b/src/software/test/test_installed_software_identity.py new file mode 100644 index 0000000..f3dec9c --- /dev/null +++ b/src/software/test/test_installed_software_identity.py @@ -0,0 +1,255 @@ +#!/usr/bin/env python +# +# Copyright (C) 2012 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Michal Minar <miminar@redhat.com> +# +""" +Unit tests for LMI_InstalledSoftwareIdentity provider. +""" + +import pywbem +import socket +import unittest + +import base +import rpmcache + +class TestInstalledSoftwareIdentity(base.SoftwareBaseTestCase): + """ + Basic cim operations test. + """ + + CLASS_NAME = "LMI_InstalledSoftwareIdentity" + KEYS = ("InstalledSoftware", "System") + + def make_op(self, pkg, newer=True): + """ + @return object path of InstalledSoftwareIdentity association + """ + objpath = self.objpath.copy() + objpath["System"] = pywbem.CIMInstanceName( + classname="Linux_ComputerSystem", + namespace="root/cimv2", + keybindings=pywbem.NocaseDict({ + "CreationClassName" : "Linux_ComputerSystem", + "Name" : socket.gethostname() + })) + objpath["InstalledSoftware"] = pywbem.CIMInstanceName( + classname="LMI_SoftwareIdentity", + namespace="root/cimv2", + keybindings=pywbem.NocaseDict({ + "InstanceID" : 'LMI:PKG:' + pkg.get_nevra(newer=newer, + with_epoch="ALWAYS") + })) + return objpath + + def test_get_instance(self): + """ + Tests GetInstance call on packages from our rpm cache. + """ + for pkg in self.safe_pkgs: + objpath = self.make_op(pkg) + inst = self.conn.GetInstance(InstanceName=objpath) + self.assertEqual(inst.path, objpath, + "Object paths should match for package %s"%pkg) + for key in self.KEYS: + self.assertTrue(inst.properties.has_key(key), + "OP is missing \"%s\" key for package %s"%(key, pkg)) + self.assertEqual(inst[key], inst.path[key]) + self.assertEqual(objpath, inst.path, + "Object paths should match for package %s"%pkg) + + # try it now with CIM_ComputerSystem + system = objpath["System"].copy() + objpath["System"].classname = "CIM_ComputerSystem" + objpath["System"]["CreationClassName"] = "CIM_ComputerSystem" + + inst = self.conn.GetInstance(InstanceName=objpath) + self.assertEqual(objpath, inst.path, + "Object paths should match for package %s"%pkg) + for key in self.KEYS: + self.assertTrue(inst.properties.has_key(key), + "OP is missing \"%s\" key for package %s"%(key, pkg)) + self.assertEqual(system, inst["System"]) + self.assertEqual(objpath, inst.path) + + @base.mark_tedious + def test_enum_instance_names_safe(self): + """ + Tests EnumInstanceNames call on installed packages. + """ + inames = self.conn.EnumerateInstanceNames(ClassName=self.CLASS_NAME) + self.assertGreater(len(inames), 0) + objpath = self.make_op(self.safe_pkgs[0]) + for iname in inames: + self.assertIsInstance(iname, pywbem.CIMInstanceName) + self.assertEqual(iname.namespace, 'root/cimv2') + self.assertEqual(iname.classname, self.CLASS_NAME) + self.assertEqual(sorted(iname.keys()), sorted(self.KEYS)) + self.assertEqual(objpath["System"], iname["System"]) + nevra_set = set(i["InstalledSoftware"]["InstanceID"] for i in inames) + for pkg in self.safe_pkgs: + nevra = 'LMI:PKG:'+pkg.get_nevra(with_epoch="ALWAYS") + self.assertTrue(nevra in nevra_set, + 'Missing nevra "%s".' % nevra) + + @base.mark_tedious + def test_enum_instances(self): + """ + Tests EnumInstances call on installed packages. + """ + insts = self.conn.EnumerateInstances(ClassName=self.CLASS_NAME) + self.assertGreater(len(insts), 0) + for inst in insts: + self.assertIsInstance(inst, pywbem.CIMInstance) + self.assertEqual(inst.path.namespace, 'root/cimv2') + self.assertEqual(inst.path.classname, self.CLASS_NAME) + self.assertEqual(inst.classname, self.CLASS_NAME) + self.assertEqual(sorted(inst.keys()), sorted(self.KEYS)) + self.assertEqual(sorted(inst.path.keys()), sorted(self.KEYS)) + for key in self.KEYS: + self.assertEqual(inst[key], inst.path[key]) + nevra_set = set(i["InstalledSoftware"]["InstanceID"] for i in insts) + for pkg in self.safe_pkgs: + nevra = 'LMI:PKG:'+pkg.get_nevra(with_epoch="ALWAYS") + self.assertTrue(nevra in nevra_set, "Missing pkg %s in nevra_set." + % nevra) + + @base.mark_tedious + @base.mark_dangerous + def test_enum_instance_names(self): + """ + Tests EnumInstanceNames call on dangerous packages. + """ + pkg = self.dangerous_pkgs[0] + + rpmcache.ensure_pkg_installed(pkg) + + inames1 = self.conn.EnumerateInstanceNames( + ClassName=self.CLASS_NAME) + self.assertGreater(len(inames1), 1) + self.assertIn('LMI:PKG:'+pkg.get_nevra(with_epoch="ALWAYS"), + set(i["InstalledSoftware"]["InstanceID"] for i in inames1)) + + rpmcache.remove_pkg(pkg.name) + inames2 = self.conn.EnumerateInstanceNames( + ClassName=self.CLASS_NAME) + self.assertEqual(len(inames1), len(inames2) + 1) + self.assertNotIn('LMI:PKG:'+pkg.get_nevra(with_epoch="ALWAYS"), + set(i["InstalledSoftware"]["InstanceID"] for i in inames2)) + + rpmcache.install_pkg(pkg) + inames3 = self.conn.EnumerateInstanceNames( + ClassName=self.CLASS_NAME) + self.assertEqual(len(inames1), len(inames3)) + self.assertIn('LMI:PKG:'+pkg.get_nevra(with_epoch="ALWAYS"), + set(i["InstalledSoftware"]["InstanceID"] for i in inames3)) + + @base.mark_dangerous + def test_create_instance(self): + """ + Tests new instance creation for dangerous packages. + """ + for pkg in self.dangerous_pkgs: + if rpmcache.is_pkg_installed(pkg.name): + rpmcache.remove_pkg(pkg.name) + objpath = self.make_op(pkg) + self.assertFalse(rpmcache.is_pkg_installed(pkg)) + + inst = pywbem.CIMInstance(self.CLASS_NAME, + properties={ + "InstalledSoftware" : objpath["InstalledSoftware"], + "System" : objpath["System"]}, + path=objpath) + iname = self.conn.CreateInstance(NewInstance=inst) + self.assertTrue(rpmcache.is_pkg_installed(pkg)) + self.assertEqual(objpath, iname, + 'Object path does not match for %s.' % pkg) + + # try to install second time + with self.assertRaises(pywbem.CIMError) as cm: + self.conn.CreateInstance(NewInstance=inst) + self.assertEqual(cm.exception.args[0], + pywbem.CIM_ERR_ALREADY_EXISTS) + + @base.mark_dangerous + def test_delete_instance(self): + """ + Tests removing of dangerous packages by deleting instance name. + """ + for pkg in self.dangerous_pkgs: + self.ensure_pkg_installed(pkg) + self.assertTrue(rpmcache.is_pkg_installed(pkg)) + objpath = self.make_op(pkg) + self.conn.DeleteInstance(InstanceName=objpath) + self.assertFalse(rpmcache.is_pkg_installed(pkg), + "Failed to delete instance for %s." % pkg) + + with self.assertRaises(pywbem.CIMError) as cm: + self.conn.DeleteInstance(InstanceName=objpath) + self.assertEqual(cm.exception.args[0], + pywbem.CIM_ERR_NOT_FOUND) + + @base.mark_tedious + def test_get_system_referents(self): + """ + Test ReferenceNames for ComputerSystem. + """ + objpath = self.make_op(self.safe_pkgs[0]) + refs = self.conn.AssociatorNames( + Role="System", + ObjectName=objpath["System"], + ResultRole="InstalledSoftware", + ResultClass="LMI_SoftwareIdentity") + self.assertGreater(len(refs), 0) + for ref in refs: + self.assertIsInstance(ref, pywbem.CIMInstanceName) + self.assertEqual(ref.namespace, 'root/cimv2') + self.assertEqual(ref.classname, "LMI_SoftwareIdentity") + self.assertEqual(sorted(ref.keys()), ["InstanceID"]) + self.assertTrue(ref["InstanceID"].startswith("LMI:PKG:")) + + nevra_set = set(i["InstanceID"] for i in refs) + # NOTE: installed packages might not be available + for pkg in self.safe_pkgs: + nevra = 'LMI:PKG:'+pkg.get_nevra(with_epoch="ALWAYS") + self.assertTrue(nevra in nevra_set, + 'Missing nevra "%s".' % nevra) + + def test_get_installed_software_referents(self): + """ + Test ReferenceNames for SoftwareIdentity. + """ + for pkg in self.safe_pkgs: + objpath = self.make_op(pkg) + refs = self.conn.AssociatorNames( + ObjectName=objpath["InstalledSoftware"], + Role="InstalledSoftware", + ResultRole="System", + ResultClass="Linux_ComputerSystem") + self.assertEqual(len(refs), 1) + ref = refs[0] + self.assertEqual(objpath["System"], ref) + +def suite(): + """For unittest loaders.""" + return unittest.TestLoader().loadTestsFromTestCase( + TestInstalledSoftwareIdentity) + +if __name__ == '__main__': + unittest.main() |