summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMichal Minar <miminar@redhat.com>2012-11-29 11:20:21 +0100
committerMichal Minar <miminar@redhat.com>2012-11-29 11:20:21 +0100
commitf1a2bbd99380602bc4d33833f1bc78e38899ae08 (patch)
treec17ef006cd413cc655c3390b4a8f088968e81951 /src
parent148b5b7d18c449b87b1103c05612ff2a2fe83233 (diff)
downloadopenlmi-providers-f1a2bbd99380602bc4d33833f1bc78e38899ae08.tar.gz
openlmi-providers-f1a2bbd99380602bc4d33833f1bc78e38899ae08.tar.xz
openlmi-providers-f1a2bbd99380602bc4d33833f1bc78e38899ae08.zip
fixed most of pylint errors
also modified mof, so it's better parseable greatly reduced pylint errors and warnings LMI_SoftwareInstalledPackage now returns reference to Linux_ComputerSystem instead of CIM_ComputerSystem
Diffstat (limited to 'src')
-rw-r--r--src/software/openlmi/software/LMI_SoftwareFileCheck.py47
-rw-r--r--src/software/openlmi/software/LMI_SoftwareInstalledPackage.py104
-rw-r--r--src/software/openlmi/software/LMI_SoftwarePackage.py55
-rw-r--r--src/software/openlmi/software/LMI_SoftwarePackageChecks.py63
-rw-r--r--src/software/openlmi/software/util/common.py665
-rwxr-xr-xsrc/software/test/test_software_installed_package.py6
6 files changed, 457 insertions, 483 deletions
diff --git a/src/software/openlmi/software/LMI_SoftwareFileCheck.py b/src/software/openlmi/software/LMI_SoftwareFileCheck.py
index c667352..63070fc 100644
--- a/src/software/openlmi/software/LMI_SoftwareFileCheck.py
+++ b/src/software/openlmi/software/LMI_SoftwareFileCheck.py
@@ -27,12 +27,10 @@ Instruments the CIM class LMI_SoftwareFileCheck
import pywbem
from pywbem.cim_provider2 import CIMProvider2
-from openlmi.software.util.common import *
+from openlmi.software.util import common
+from openlmi.software.util.common import SoftwareFileCheck
-filecheck2model = SoftwareFileCheck.filecheck_wrapper(
- 'root/cimv2', 'LMI_SoftwareFileCheck')
-
-class LMI_SoftwareFileCheck(CIMProvider2):
+class LMI_SoftwareFileCheck(CIMProvider2): # pylint: disable=R0904
"""Instrument the CIM class LMI_SoftwareFileCheck
Identifies a file contained by RPM package. It's located in directory
@@ -41,7 +39,7 @@ class LMI_SoftwareFileCheck(CIMProvider2):
"""
- def __init__ (self, env):
+ def __init__(self, env):
logger = env.get_logger()
logger.log_debug('Initializing provider %s from %s' \
% (self.__class__.__name__, __file__))
@@ -74,11 +72,10 @@ class LMI_SoftwareFileCheck(CIMProvider2):
logger.log_debug('Entering %s.get_instance()' \
% self.__class__.__name__)
- with YumDB.getInstance(env) as yb:
+ with common.YumDB.getInstance(env):
vpkg = SoftwareFileCheck.object_path2yumcheck(env, model.path)
- pkg = vpkg.po
- fi = pkg.hdr.fiFromHeader()
- return filecheck2model(vpkg, model['Name'], env, False)
+ return SoftwareFileCheck.filecheck2model(
+ vpkg, model['Name'], env, False)
def enum_instances(self, env, model, keys_only):
"""Enumerate instances.
@@ -106,8 +103,8 @@ class LMI_SoftwareFileCheck(CIMProvider2):
logger = env.get_logger()
logger.log_debug('Entering %s.enum_instances()' \
% self.__class__.__name__)
-
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED) # Remove to implement
+ # this won't be supported because of enormous amount of data
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED)
def set_instance(self, env, instance, modify_existing):
"""Return a newly created or modified instance.
@@ -139,9 +136,7 @@ class LMI_SoftwareFileCheck(CIMProvider2):
logger = env.get_logger()
logger.log_debug('Entering %s.set_instance()' \
% self.__class__.__name__)
- # TODO create or modify the instance
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED) # Remove to implement
- return instance
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED)
def delete_instance(self, env, instance_name):
"""Delete an instance.
@@ -169,8 +164,7 @@ class LMI_SoftwareFileCheck(CIMProvider2):
logger.log_debug('Entering %s.delete_instance()' \
% self.__class__.__name__)
- # TODO delete the resource
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED) # Remove to implement
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED)
def cim_method_invoke(self, env, object_name):
"""Implements LMI_SoftwareFileCheck.Invoke()
@@ -214,17 +208,18 @@ class LMI_SoftwareFileCheck(CIMProvider2):
logger.log_debug('Entering %s.cim_method_invoke()' \
% self.__class__.__name__)
- with YumDB.getInstance(env) as yb:
+ with common.YumDB.getInstance(env):
vpkg = SoftwareFileCheck.object_path2yumcheck(env, object_name)
- fc = SoftwareFileCheck.test_file(env,
+ sfc = SoftwareFileCheck.test_file(env,
SoftwareFileCheck.pkg_checksum_type(vpkg.po),
vpkg._files[object_name["Name"]])
out_params = []
- ret = 0 if SoftwareFileCheck.filecheck_passed(fc) else 2
+ ret = 0 if SoftwareFileCheck.filecheck_passed(sfc) else 2
return (pywbem.Uint32(ret), out_params)
- def cim_method_invokeonsystem(self, env, object_name,
- param_targetsystem=None):
+ def cim_method_invokeonsystem(self, env,
+ object_name, #pylint: disable=W0613
+ param_targetsystem=None): #pylint: disable=W0613
"""Implements LMI_SoftwareFileCheck.InvokeOnSystem()
The InvokeOnSystem method evaluates this Check. The details of the
@@ -270,10 +265,7 @@ class LMI_SoftwareFileCheck(CIMProvider2):
% self.__class__.__name__)
# TODO do something
- raise pywbem.CIMError(pywbem.CIM_ERR_METHOD_NOT_AVAILABLE) # Remove to implemented
- out_params = []
- #rval = # TODO (type pywbem.Uint32)
- return (rval, out_params)
+ raise pywbem.CIMError(pywbem.CIM_ERR_METHOD_NOT_AVAILABLE)
class Values(object):
class TargetOperatingSystem(object):
@@ -406,8 +398,7 @@ class LMI_SoftwareFileCheck(CIMProvider2):
## end of class LMI_SoftwareFileCheckProvider
-## get_providers() for associating CIM Class Name to python provider class name
-
def get_providers(env):
+ """for associating CIM Class Name to python provider class name"""
lmi_softwarefilecheck_prov = LMI_SoftwareFileCheck(env)
return {'LMI_SoftwareFileCheck': lmi_softwarefilecheck_prov}
diff --git a/src/software/openlmi/software/LMI_SoftwareInstalledPackage.py b/src/software/openlmi/software/LMI_SoftwareInstalledPackage.py
index 51d15b6..b59121d 100644
--- a/src/software/openlmi/software/LMI_SoftwareInstalledPackage.py
+++ b/src/software/openlmi/software/LMI_SoftwareInstalledPackage.py
@@ -28,12 +28,14 @@ Instruments the CIM class LMI_SoftwareInstalledPackage
import itertools
import pywbem
+import yum
from pywbem.cim_provider2 import CIMProvider2
-from openlmi.software.LMI_SoftwarePackage import pkg2model, LMI_SoftwarePackage
-from openlmi.software.LMI_SoftwareFileCheck import filecheck2model
-from openlmi.software.util.common import *
+from openlmi.software.LMI_SoftwarePackage import LMI_SoftwarePackage
+from openlmi.software.util import common
+from openlmi.software.util.common import (
+ YumDB, SoftwarePackage, SoftwareFileCheck)
-class LMI_SoftwareInstalledPackage(CIMProvider2):
+class LMI_SoftwareInstalledPackage(CIMProvider2): #pylint: disable=R0904
"""Instrument the CIM class LMI_SoftwareInstalledPackage
The InstalledSoftwareElement association allows the identification of
@@ -79,10 +81,11 @@ class LMI_SoftwareInstalledPackage(CIMProvider2):
if not "System" in model:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Missing System property.")
- check_computer_system_op(env, model['System'])
+ common.check_computer_system_op(env, model['System'])
+ model['System'] = common.get_computer_system_op()
with YumDB.getInstance(env):
pkg = SoftwarePackage.object_path2pkg(env, model['Software'])
- model['Software'] = pkg2model(env, pkg, True)
+ model['Software'] = SoftwarePackage.pkg2model(env, pkg, True)
return model
def enum_instances(self, env, model, keys_only):
@@ -120,19 +123,20 @@ class LMI_SoftwareInstalledPackage(CIMProvider2):
yum_package_path = pywbem.CIMInstanceName("LMI_SoftwarePackage",
namespace=model.path.namespace,
host=model.path.host)
- model['System'] = get_computer_system_op()
- with YumDB.getInstance(env) as yb:
- for rpm in yb.rpmdb:
- iname = pkg2model(env, rpm, True, yum_package_path)
+ model['System'] = common.get_computer_system_op()
+ with YumDB.getInstance(env) as ydb:
+ for pkg in ydb.rpmdb:
+ iname = SoftwarePackage.pkg2model(env, pkg,
+ True, yum_package_path)
model['Software'] = iname
if keys_only:
yield model
else:
try:
yield self.get_instance(env, model)
- except pywbem.CIMError, (num, msg):
- if num not in (pywbem.CIM_ERR_NOT_FOUND,
- pywbem.CIM_ERR_ACCESS_DENIED):
+ except pywbem.CIMError as error:
+ if error.args[0] not in (pywbem.CIM_ERR_NOT_FOUND,
+ pywbem.CIM_ERR_ACCESS_DENIED):
raise
def set_instance(self, env, instance, modify_existing):
@@ -177,27 +181,28 @@ class LMI_SoftwareInstalledPackage(CIMProvider2):
if not "System" in instance:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Missing System property.")
- check_computer_system_op(env, instance['System'])
+ common.check_computer_system_op(env, instance['System'])
- with YumDB.getInstance(env) as yb:
- pkg = SoftwarePackage.object_path2pkg_search(env, instance['Software'])
+ with YumDB.getInstance(env) as ydb:
+ pkg = SoftwarePackage.object_path2pkg_search(
+ env, instance['Software'])
if isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
raise pywbem.CIMError(pywbem.CIM_ERR_ALREADY_EXISTS,
"Package is already installed.")
logger.log_info('installing package {}'.format(pkg.nevra))
# install
- yb.install(pkg)
- yb.buildTransaction()
- yb.processTransaction()
+ ydb.install(pkg)
+ ydb.buildTransaction()
+ ydb.processTransaction()
logger.log_info('package installed'.format(pkg.nevra))
# return instance
- pkg_iname = pkg2model(env, pkg, True)
+ pkg_iname = SoftwarePackage.pkg2model(env, pkg, True)
pkg_iname["SoftwareElementState"] = \
LMI_SoftwarePackage.Values.SoftwareElementState.Executable
instance["Software"] = pkg_iname
- return LMI_SoftwareInstalledPackage(env).get_instance(env, instance)
+ return self.get_instance(env, instance)
def delete_instance(self, env, instance_name):
"""Delete an instance.
@@ -231,13 +236,14 @@ class LMI_SoftwareInstalledPackage(CIMProvider2):
if not "System" in instance_name:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Missing System property.")
- check_computer_system_op(env, instance_name['System'])
- with YumDB.getInstance(env) as yb:
- pkg = SoftwarePackage.object_path2pkg(env, instance_name["Software"])
+ common.check_computer_system_op(env, instance_name['System'])
+ with YumDB.getInstance(env) as ydb:
+ pkg = SoftwarePackage.object_path2pkg(
+ env, instance_name["Software"])
logger.log_info('removing package "%s"' % pkg.nevra)
- yb.remove(pkg)
- yb.buildTransaction()
- yb.processTransaction()
+ ydb.remove(pkg)
+ ydb.buildTransaction()
+ ydb.processTransaction()
logger.log_info('package "%s" removed' % pkg.nevra)
def references(self, env, object_name, model, result_class_name, role,
@@ -302,14 +308,14 @@ class LMI_SoftwareInstalledPackage(CIMProvider2):
logger = env.get_logger()
logger.log_debug('Entering %s.references()' \
% self.__class__.__name__)
- ch = env.get_cimom_handle()
+ cimhandle = env.get_cimom_handle()
# If you want to get references for free, implemented in terms
# of enum_instances, just leave the code below unaltered.
- if ch.is_subclass(object_name.namespace,
+ if cimhandle.is_subclass(object_name.namespace,
sub=object_name.classname,
super='CIM_ComputerSystem') or \
- ch.is_subclass(object_name.namespace,
+ cimhandle.is_subclass(object_name.namespace,
sub=object_name.classname,
super='LMI_SoftwarePackage'):
return self.simple_refs(env, object_name, model,
@@ -363,16 +369,18 @@ class LMI_SoftwareInstalledPackage(CIMProvider2):
if not "System" in object_name:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Missing System property.")
- with YumDB.getInstance(env) as yb:
+ with YumDB.getInstance(env):
pkg = SoftwarePackage.object_path2pkg(env, object_name['Software'])
csum = SoftwareFileCheck.pkg_checksum_type(pkg)
vpkg = yum.packages._RPMVerifyPackage(
pkg, pkg.hdr.fiFromHeader(), csum, [], True)
for vpf in vpkg:
- fc = SoftwareFileCheck.test_file(env, csum, vpf)
- if SoftwareFileCheck.filecheck_passed(fc): continue
- failed.append(filecheck2model(
- vpkg, vpf.filename, env, keys_only=True, fc=fc))
+ file_check = SoftwareFileCheck.test_file(env, csum, vpf)
+ if SoftwareFileCheck.filecheck_passed(file_check):
+ continue
+ failed.append(SoftwareFileCheck.filecheck2model(
+ vpkg, vpf.filename, env, keys_only=True,
+ file_check=file_check))
out_params = [ pywbem.CIMParameter('Failed', type='reference',
value=failed) ]
return ( getattr(self.Values.CheckIntegrity,
@@ -433,9 +441,9 @@ class LMI_SoftwareInstalledPackage(CIMProvider2):
if not "System" in object_name:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Missing System property.")
- check_computer_system_op(env, object_name['System'])
+ common.check_computer_system_op(env, object_name['System'])
- with YumDB.getInstance(env) as yb:
+ with YumDB.getInstance(env) as ydb:
orig = SoftwarePackage.object_path2pkg(env, object_name['Software'])
evr_str = []
@@ -443,7 +451,7 @@ class LMI_SoftwareInstalledPackage(CIMProvider2):
('epoch', param_epoch),
('version', param_version),
('release', param_release)):
- evr_str.append("%s(%s)"%(name,param))
+ evr_str.append("%s(%s)"%(name, param))
if len(evr_str):
evr_str = "specific "+'-'.join(evr_str)
else:
@@ -452,9 +460,9 @@ class LMI_SoftwareInstalledPackage(CIMProvider2):
logger.log_info('trying to update to %s of package \"%s\"' %
(evr_str, object_name["Software"]["Name"]))
- pl = yb.doPackageLists('all', showdups=True)
- exact,_,_ = yum.packages.parsePackages(
- itertools.chain(pl.available, pl.installed),
+ pkglist = ydb.doPackageLists('all', showdups=True)
+ exact, _, _ = yum.packages.parsePackages(
+ itertools.chain(pkglist.available, pkglist.installed),
[orig.name])
# NOTE: available ∩ installed = ∅
exact = sorted(exact, key=lambda a:a.evra)
@@ -469,18 +477,18 @@ class LMI_SoftwareInstalledPackage(CIMProvider2):
'could not find any matching available package')
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND)
out_params = [pywbem.CIMParameter('Installed', type='reference',
- value=pkg2model(env, pkg, True))]
+ value=SoftwarePackage.pkg2model(env, pkg, True))]
if orig.evra == pkg.evra:
logger.log_info('already up to date')
return (self.Values.Update.Already_newest, out_params)
- yb.update(update_to=True,
+ ydb.update(update_to=True,
name=pkg.name,
epoch=pkg.epoch,
version=pkg.version,
release=pkg.release)
- yb.buildTransaction()
- yb.processTransaction()
+ ydb.buildTransaction()
+ ydb.processTransaction()
logger.log_info('package {} updated to: {}'.format(
orig.name, pkg.evra))
@@ -502,8 +510,8 @@ class LMI_SoftwareInstalledPackage(CIMProvider2):
## end of class LMI_SoftwareInstalledPackage
-## get_providers() for associating CIM Class Name to python provider class name
def get_providers(env):
- lmi_softwareinstalledpackage_prov = LMI_SoftwareInstalledPackage(env)
- return {'LMI_SoftwareInstalledPackage': lmi_softwareinstalledpackage_prov}
+ """Associates CIM Class Name to python provider class name"""
+ lmi_sip_prov = LMI_SoftwareInstalledPackage(env)
+ return {'LMI_SoftwareInstalledPackage': lmi_sip_prov}
diff --git a/src/software/openlmi/software/LMI_SoftwarePackage.py b/src/software/openlmi/software/LMI_SoftwarePackage.py
index 2395dca..01956ce 100644
--- a/src/software/openlmi/software/LMI_SoftwarePackage.py
+++ b/src/software/openlmi/software/LMI_SoftwarePackage.py
@@ -27,14 +27,12 @@ Instruments the CIM class LMI_SoftwarePackage
"""
import itertools
-import datetime
import pywbem
+import yum
from pywbem.cim_provider2 import CIMProvider2
-from openlmi.software.util.common import *
+from openlmi.software.util.common import (YumDB, SoftwarePackage)
-pkg2model = SoftwarePackage.pkg2model_wrapper('root/cimv2', "LMI_SoftwarePackage")
-
-class LMI_SoftwarePackage(CIMProvider2):
+class LMI_SoftwarePackage(CIMProvider2): #pylint: disable=R0904
"""Instrument the CIM class LMI_SoftwarePackage
RPM package installed on particular computer system with YUM (The
@@ -42,7 +40,7 @@ class LMI_SoftwarePackage(CIMProvider2):
"""
- def __init__ (self, env):
+ def __init__(self, env):
logger = env.get_logger()
logger.log_debug('Initializing provider %s from %s' \
% (self.__class__.__name__, __file__))
@@ -76,7 +74,8 @@ class LMI_SoftwarePackage(CIMProvider2):
with YumDB.getInstance(env):
pkg = SoftwarePackage.object_path2pkg(env, model.path, 'all')
- return pkg2model(env, pkg, keys_only=False, model=model)
+ return SoftwarePackage.pkg2model(env, pkg,
+ keys_only=False, model=model)
def enum_instances(self, env, model, keys_only):
"""Enumerate instances.
@@ -112,13 +111,13 @@ class LMI_SoftwarePackage(CIMProvider2):
'SoftwareElementState': None, 'Name': None,
'SoftwareElementID': None})
- with YumDB.getInstance(env) as yb:
+ with YumDB.getInstance(env) as ydb:
# get all packages
- pl = yb.doPackageLists('all', showdups=True)
- pl = itertools.chain(pl.installed, pl.available)
+ pkglist = ydb.doPackageLists('all', showdups=True)
+ pkglist = itertools.chain(pkglist.installed, pkglist.available)
# NOTE: available ∩ installed = ∅
- for pkg in sorted(pl, key=lambda a:a.evra):
- yield pkg2model(env, pkg, keys_only, model)
+ for pkg in sorted(pkglist, key=lambda a:a.evra):
+ yield SoftwarePackage.pkg2model(env, pkg, keys_only, model)
def set_instance(self, env, instance, modify_existing):
"""Return a newly created or modified instance.
@@ -150,9 +149,7 @@ class LMI_SoftwarePackage(CIMProvider2):
logger = env.get_logger()
logger.log_debug('Entering %s.set_instance()' \
% self.__class__.__name__)
- # TODO create or modify the instance
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED) # Remove to implement
- return instance
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED)
def delete_instance(self, env, instance_name):
"""Delete an instance.
@@ -179,9 +176,6 @@ class LMI_SoftwarePackage(CIMProvider2):
logger = env.get_logger()
logger.log_debug('Entering %s.delete_instance()' \
% self.__class__.__name__)
-
- # TODO delete the resource
- # Remove to implement
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED)
def cim_method_install(self, env, object_name):
@@ -219,22 +213,23 @@ class LMI_SoftwarePackage(CIMProvider2):
logger.log_debug('Entering %s.cim_method_install()' \
% self.__class__.__name__)
- with YumDB.getInstance(env) as yb:
+ with YumDB.getInstance(env) as ydb:
# get available packages
pkg = SoftwarePackage.object_path2pkg_search(env, object_name)
out_params = [ pywbem.CIMParameter('Installed', type='reference') ]
if isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
- out_params[0].value = pkg2model(env, pkg, True)
+ out_params[0].value = SoftwarePackage.pkg2model(env, pkg, True)
return (self.Values.Install.Already_installed, out_params)
logger.log_info('installing package {}'.format(pkg.nevra))
# install
- yb.install(pkg)
- yb.buildTransaction()
- yb.processTransaction()
+ ydb.install(pkg)
+ ydb.buildTransaction()
+ ydb.processTransaction()
logger.log_info('package installed'.format(pkg.nevra))
- out_params[0].value = pkg2model(env, pkg, True, object_name)
+ out_params[0].value = SoftwarePackage.pkg2model(
+ env, pkg, True, object_name)
out_params[0].value['SoftwareElementState'] = \
self.Values.SoftwareElementState.Executable
return (self.Values.Install.Successful_installation, out_params)
@@ -271,18 +266,17 @@ class LMI_SoftwarePackage(CIMProvider2):
logger.log_debug('Entering %s.cim_method_remove()' \
% self.__class__.__name__)
- with YumDB.getInstance(env) as yb:
+ with YumDB.getInstance(env) as ydb:
pkg = SoftwarePackage.object_path2pkg(env, object_name, 'all')
if isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
logger.log_info('removing package "%s"' % pkg.nevra)
- yb.remove(pkg)
- yb.buildTransaction()
- yb.processTransaction()
+ ydb.remove(pkg)
+ ydb.buildTransaction()
+ ydb.processTransaction()
logger.log_info('package "%s" removed' % pkg.nevra)
rval = self.Values.Remove.Successful_removal
else:
rval = self.Values.Remove.Not_installed
- #rval = # TODO (type pywbem.Uint32 self.Values.Remove)
return (rval, [])
class Values(object):
@@ -515,8 +509,7 @@ class LMI_SoftwarePackage(CIMProvider2):
## end of class LMI_SoftwarePackage
-## get_providers() for associating CIM Class Name to python provider class name
-
def get_providers(env):
+ """Associates CIM Class Name to python provider class name"""
lmi_softwarepackage_prov = LMI_SoftwarePackage(env)
return {'LMI_SoftwarePackage': lmi_softwarepackage_prov}
diff --git a/src/software/openlmi/software/LMI_SoftwarePackageChecks.py b/src/software/openlmi/software/LMI_SoftwarePackageChecks.py
index a2f798b..facf90e 100644
--- a/src/software/openlmi/software/LMI_SoftwarePackageChecks.py
+++ b/src/software/openlmi/software/LMI_SoftwarePackageChecks.py
@@ -26,12 +26,12 @@ Instruments the CIM class LMI_SoftwarePackageChecks
"""
import pywbem
+import yum
from pywbem.cim_provider2 import CIMProvider2
-from openlmi.software.LMI_SoftwareFileCheck import filecheck2model
-from openlmi.software.LMI_SoftwarePackage import pkg2model
-from openlmi.software.util.common import *
+from openlmi.software.util import common
+from openlmi.software.util.common import (SoftwarePackage, SoftwareFileCheck)
-class LMI_SoftwarePackageChecks(CIMProvider2):
+class LMI_SoftwarePackageChecks(CIMProvider2): #pylint: disable=R0904
"""Instrument the CIM class LMI_SoftwarePackageChecks
This association ties a SoftwareElement to a specific Check to validate
@@ -81,9 +81,11 @@ class LMI_SoftwarePackageChecks(CIMProvider2):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Missing Element property.")
vpkg = SoftwareFileCheck.object_path2yumcheck(env, model['Check'])
- model['Check'] = filecheck2model(vpkg, model['Check']['Name'],
+ model['Check'] = SoftwareFileCheck.filecheck2model(
+ vpkg, model['Check']['Name'],
env, keys_only=True)
- model['Element'] = pkg2model(env, vpkg.po, keys_only=True)
+ model['Element'] = SoftwarePackage.pkg2model(
+ env, vpkg.po, keys_only=True)
return model
def enum_instances(self, env, model, keys_only):
@@ -112,26 +114,8 @@ class LMI_SoftwarePackageChecks(CIMProvider2):
logger = env.get_logger()
logger.log_debug('Entering %s.enum_instances()' \
% self.__class__.__name__)
-
- # Prime model.path with knowledge of the keys, so key values on
- # the CIMInstanceName (model.path) will automatically be set when
- # we set property values on the model.
- model.path.update({'Check': None, 'Element': None})
-
- while False: # TODO more instances?
- # TODO fetch system resource
- # Key properties
- #model['Check'] = pywbem.CIMInstanceName(classname='LMI_SoftwareFileCheck', ...) # TODO (type = REF (pywbem.CIMInstanceName(classname='LMI_SoftwareFileCheck', ...))
- #model['Element'] = pywbem.CIMInstanceName(classname='LMI_SoftwarePackage', ...) # TODO (type = REF (pywbem.CIMInstanceName(classname='LMI_SoftwarePackage', ...))
- if keys_only:
- yield model
- else:
- try:
- yield self.get_instance(env, model)
- except pywbem.CIMError, (num, msg):
- if num not in (pywbem.CIM_ERR_NOT_FOUND,
- pywbem.CIM_ERR_ACCESS_DENIED):
- raise
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED,
+ "Enumeration of instances is not supported.")
def set_instance(self, env, instance, modify_existing):
"""Return a newly created or modified instance.
@@ -163,9 +147,7 @@ class LMI_SoftwarePackageChecks(CIMProvider2):
logger = env.get_logger()
logger.log_debug('Entering %s.set_instance()' \
% self.__class__.__name__)
- # TODO create or modify the instance
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED) # Remove to implement
- return instance
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED)
def delete_instance(self, env, instance_name):
"""Delete an instance.
@@ -192,9 +174,7 @@ class LMI_SoftwarePackageChecks(CIMProvider2):
logger = env.get_logger()
logger.log_debug('Entering %s.delete_instance()' \
% self.__class__.__name__)
-
- # TODO delete the resource
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED) # Remove to implement
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED)
def references(self, env, object_name, model, result_class_name, role,
result_role, keys_only):
@@ -258,16 +238,16 @@ class LMI_SoftwarePackageChecks(CIMProvider2):
logger = env.get_logger()
logger.log_debug('Entering %s.references()' \
% self.__class__.__name__)
- ch = env.get_cimom_handle()
+ cimhandle = env.get_cimom_handle()
# Prime model.path with knowledge of the keys, so key values on
# the CIMInstanceName (model.path) will automatically be set when
# we set property values on the model.
model.path.update({'Check': None, 'Element': None})
- with YumDB.getInstance(env):
+ with common.YumDB.getInstance(env):
if ( (not role or role.lower() == 'element')
- and ch.is_subclass(object_name.namespace,
+ and cimhandle.is_subclass(object_name.namespace,
sub=object_name.classname,
super='LMI_SoftwarePackage')):
filecheck_model = pywbem.CIMInstanceName(
@@ -280,20 +260,20 @@ class LMI_SoftwarePackageChecks(CIMProvider2):
vpkg = yum.packages._RPMVerifyPackage(
pkg, pkg.hdr.fiFromHeader(),
SoftwareFileCheck.pkg_checksum_type(pkg), [], True)
- for fc in vpkg:
- model['Check'] = filecheck2model(
- vpkg, fc.filename, env, keys_only=True,
+ for file_check in vpkg:
+ model['Check'] = SoftwareFileCheck.filecheck2model(
+ vpkg, file_check.filename, env, keys_only=True,
model=filecheck_model)
yield model
if ( (not role or role.lower() == 'check')
- and ch.is_subclass(object_name.namespace,
+ and cimhandle.is_subclass(object_name.namespace,
sub=object_name.classname,
super='LMI_SoftwareFileCheck')):
model['Check'] = object_name
vpkg = SoftwareFileCheck.object_path2yumcheck(env, object_name)
- model['Element'] = pkg2model(
+ model['Element'] = SoftwarePackage.pkg2model(
env, vpkg.po, keys_only=True)
yield model
@@ -304,8 +284,7 @@ class LMI_SoftwarePackageChecks(CIMProvider2):
## end of class LMI_SoftwarePackageChecksProvider
-## get_providers() for associating CIM Class Name to python provider class name
-
def get_providers(env):
+ """Associates CIM Class Name to python provider class name"""
lmi_softwarepackagechecks_prov = LMI_SoftwarePackageChecks(env)
return {'LMI_SoftwarePackageChecks': lmi_softwarepackagechecks_prov}
diff --git a/src/software/openlmi/software/util/common.py b/src/software/openlmi/software/util/common.py
index 7b54f1e..7deef32 100644
--- a/src/software/openlmi/software/util/common.py
+++ b/src/software/openlmi/software/util/common.py
@@ -40,15 +40,19 @@ import yum
import cmpi_pywbem_bindings as pycimmb
from openlmi.software.util import singletonmixin
-re_evra = re.compile(r'^(?P<epoch>\d+):(?P<ver>[^-]+)'
+RE_EVRA = re.compile(r'^(?P<epoch>\d+):(?P<ver>[^-]+)'
r'-(?P<rel>.+)\.(?P<arch>[^.]+)$')
-re_nevra = re.compile(r'^(?P<name>.+)-(?P<evra>((?P<epoch>\d+):)?(?P<ver>[^-]+)'
+RE_NEVRA = re.compile(r'^(?P<name>.+)-(?P<evra>((?P<epoch>\d+):)?(?P<ver>[^-]+)'
r'-(?P<rel>.+)\.(?P<arch>[^.]+))$')
-rpmdb_path = '/var/lib/rpm/Packages'
+RPMDB_PATH = '/var/lib/rpm/Packages'
class YumDB(singletonmixin.Singleton):
+ """
+ Context manager for accessing yum/rpm database.
+ """
+ # this is to inform Singleton, that __init__ should be called only once
ignoreSubsequent = True
def __init__(self, env, *args, **kwargs):
@@ -63,14 +67,23 @@ class YumDB(singletonmixin.Singleton):
env.get_logger().log_info('init called')
def is_dirty(self):
- return self._db_mtime < os.stat(rpmdb_path).st_mtime
+ """
+ @return True if rpm database has been modified since last update
+ """
+ return self._db_mtime < os.stat(RPMDB_PATH).st_mtime
def is_locked(self):
+ """
+ @return True if rpm database is locked
+ """
return self._yum._lockfile is not None
def update_db(self):
+ """
+ Call to update database metadata.
+ """
self.env.get_logger().log_info('updating rpmdb')
- self._db_mtime = os.stat(rpmdb_path).st_mtime
+ self._db_mtime = os.stat(RPMDB_PATH).st_mtime
self._yum.doConfigSetup()
self._yum.doTsSetup()
self._yum.doRpmDBSetup()
@@ -96,6 +109,9 @@ class YumDB(singletonmixin.Singleton):
def _get_distname():
+ """
+ @return name of linux distribution
+ """
if hasattr(platform, 'linux_distribution'):
return platform.linux_distribution(
full_distribution_name=False)[0].lower()
@@ -138,11 +154,14 @@ def get_target_operating_system():
else:
return (0, 'Unknown')
-def get_computer_system_op():
+def get_computer_system_op(prefix='Linux'):
+ """
+ @return object path of CIM_ComputerSystem for this system
+ """
return pywbem.CIMInstanceName(
- classname='CIM_ComputerSystem',
+ classname='%s_ComputerSystem' % prefix,
keybindings={
- "CreationClassName": "CIM_ComputerSystem"
+ "CreationClassName": "%s_ComputerSystem" % prefix
, "Name" : socket.gethostname() },
namespace="root/cimv2")
@@ -156,9 +175,11 @@ def check_target_operating_system(system):
raise TypeError("system must be either string or integer, not {}"
.format(system.__class__.__name__))
tos = get_target_operating_system()
- if system == tos: return True
+ if system == tos:
+ return True
if system == 36: # linux
- if platform.system().lower() == "linux": return True
+ if platform.system().lower() == "linux":
+ return True
if ( system >= 97 and system <= 100 # linux 2.x.x
and platform.uname()[2].startswith('2.4' if system < 99 else '2.6')
# check machine
@@ -168,12 +189,18 @@ def check_target_operating_system(system):
return False
def check_computer_system_op(env, system):
+ """
+ @param system is object path referring to CIM_ComputerSystem instance
+ passed as argument to some cim function
+ @return True if this instance matches our system; otherwise a CIMError
+ will be raised
+ """
if not isinstance(system, pywbem.CIMInstanceName):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"\"System\" must be a CIMInstanceName")
- our_system = get_computer_system_op()
- ch = env.get_cimom_handle()
- if not ch.is_subclass(system.namespace,
+ our_system = get_computer_system_op('CIM')
+ chandle = env.get_cimom_handle()
+ if not chandle.is_subclass(system.namespace,
sub=system.classname,
super=our_system.classname):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
@@ -182,7 +209,7 @@ def check_computer_system_op(env, system):
if not 'CreationClassName' in system or not 'Name' in system:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"\"System\" is missing one of keys")
- if not ch.is_subclass(system.namespace,
+ if not chandle.is_subclass(system.namespace,
sub=system['CreationClassName'],
super=our_system.classname):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
@@ -204,128 +231,72 @@ def match_pkg(pkg, **kwargs):
name, epoch, version, release, arch
evra, nevra
"""
- for a in ('evra', 'nevra', 'name', 'epoch', 'version', 'release', 'arch'):
- v = kwargs.get(a, None)
- if v and getattr(pkg, a) != v:
+ for attr in ( 'evra', 'nevra', 'name', 'epoch'
+ , 'version', 'release', 'arch'):
+ value = kwargs.get(attr, None)
+ if value and getattr(pkg, attr) != value:
return False
return True
class SoftwarePackage:
"""
Just a namespace for common function related to SoftwarePackage provider.
+ TODO: make it a submodule
"""
@staticmethod
- def parse_group(group, env):
- try:
- res = \
- { "amusements/games" : 3
- , "amusements/graphics" : 4
- , "applications/archiving" : 5
- , "applications/communications" : 6
- , "applications/databases" : 7
- , "applications/editors" : 8
- , "applications/emulators" : 9
- , "applications/engineering" : 10
- , "applications/file" : 11
- , "applications/internet" : 12
- , "applications/multimedia" : 13
- , "applications/productivity" : 14
- , "applications/publishing" : 15
- , "applications/system" : 16
- , "applications/text" : 17
- , "development/build tools" : 18
- , "development/debug" : 19
- , "development/debuggers" : 20
- , "development/documentation" : 21
- , "development/java" : 22
- , "development/languages" : 23
- , "development/libraries" : 24
- , "development/libraries/java" : 25
- , "development/system" : 26
- , "development/tools" : 27
- , "documentation" : 28
- , "internet/www/dynamic content" : 29
- , "system/libraries" : 30
- , "system environment/base" : 31
- , "system environment/daemons" : 32
- , "system environment/kernel" : 33
- , "system environment/libraries" : 34
- , "system environment/shells" : 35
- , "text processing/markup/xml" : 36
- , "user interface/desktops" : 37
- , "user interface/x" : 38
- , "user interface/x hardware support" : 39
- , "utilities" : 40
- }[group.lower()]
- except KeyError:
- logger = env.get_logger()
- if not group or group.lower() == "unspecified":
- logger.log_info("unspecified group '{}'".format(group))
- res = 2
- else:
- logger.log_error("failed to parse group '{}'".format(group))
- res = 0
- return pywbem.Uint16(res)
-
- @staticmethod
- def object_path2pkg(env, op, package_list='installed'):
+ def object_path2pkg(env, objpath, package_list='installed'):
"""
- @param op must contain precise information of package,
+ @param objpath must contain precise information of package,
otherwise a CIM_ERR_NOT_FOUND error is raised
@param package_list one of {'installed', 'all', 'available'}
says, where to look for given package
"""
- if not isinstance(op, pywbem.CIMInstanceName):
- raise TypeError("op must be an instance of CIMInstanceName")
+ if not isinstance(objpath, pywbem.CIMInstanceName):
+ raise TypeError("objpath must be an instance of CIMInstanceName")
if not isinstance(package_list, basestring):
raise TypeError("package_list must be a string")
if not package_list in ('installed', 'all', 'available'):
raise ValueError('unsupported package list "%s"'%package_list)
- tos = get_target_operating_system()[0]
- if ( not op['Name'] or not op['SoftwareElementID']
- or not op['SoftwareElementID'].startswith(op['Name'])
- or op['SoftwareElementID'].find(op['Version']) == -1):
+ if ( not objpath['Name'] or not objpath['SoftwareElementID']
+ or not objpath['SoftwareElementID'].startswith(objpath['Name'])
+ or objpath['SoftwareElementID'].find(objpath['Version']) == -1):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong keys.")
-# if op['SoftwareElementState'] not in ("2", 2):
-# raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
-# "Only \"Executable\" software element state supported")
- if not check_target_operating_system(op['TargetOperatingSystem']):
+ if not check_target_operating_system(objpath['TargetOperatingSystem']):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Wrong target operating system.")
- if not op['Name'] or not op['Version']:
+ if not objpath['Name'] or not objpath['Version']:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
'Both "Name" and "Version" must be given')
- m = re_nevra.match(op['SoftwareElementID'])
- if not m:
+ match = RE_NEVRA.match(objpath['SoftwareElementID'])
+ if not match:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Wrong SotwareElementID. Expected valid nevra"
" (name-[epoch:]version-release.arch).")
- if op['Version'] != m.group('ver'):
+ if objpath['Version'] != match.group('ver'):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Version does not match version part in SoftwareElementID.")
evra = "{}:{}-{}.{}".format(*(
- (m.group(k) if k != "epoch" or m.group(k) else "0")
+ (match.group(k) if k != "epoch" or match.group(k) else "0")
for k in ("epoch", 'ver', 'rel', 'arch')))
- with YumDB.getInstance(env) as yb:
- pl = yb.doPackageLists(package_list,
+ with YumDB.getInstance(env) as ydb:
+ pkglist = ydb.doPackageLists(package_list,
showdups=package_list != 'installed')
if package_list != 'all':
- pl = getattr(pl, package_list)
+ pkglist = getattr(pkglist, package_list)
else:
# NOTE: available ∩ installed = ∅
- pl = itertools.chain(pl.available, pl.installed)
- exact,_,_ = yum.packages.parsePackages(pl, [op['Name']])
+ pkglist = itertools.chain(pkglist.available, pkglist.installed)
+ exact, _, _ = yum.packages.parsePackages(pkglist, [objpath['Name']])
for pkg in yum.misc.unique(exact):
- if pkg.evra == evra: break
- else:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "No matching package found.")
- return pkg
+ if pkg.evra == evra:
+ return pkg
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "No matching package found.")
@staticmethod
- def object_path2pkg_search(env, op):
+ def object_path2pkg_search(env, objpath):
"""
similar to object_path2pkg, but tries to find best suitable
package matching keys
@@ -333,40 +304,45 @@ class SoftwarePackage:
If any matching package is already installed, it is returned.
Otherwise available package with highest version is returned.
- @param op may be object of CIMInstance or CIMInstanceName and
+ @param objpath may be object of CIMInstance or CIMInstanceName and
must contain at least \"Name\" or \"SoftwareElementID\"
@return instance of yum.rpmsack.RPMInstalledPackage in case of
installed package, otherwise yum.packages.YumAvailablePackage
"""
logger = env.get_logger()
- if isinstance(op, pywbem.CIMInstance):
+ if isinstance(objpath, pywbem.CIMInstance):
def _get_key(k):
- v = op.properties.get(k, None)
- if isinstance(v, pywbem.CIMProperty): return v.value
- if v is not None: return v
+ """@return value of instance's key"""
+ value = objpath.properties.get(k, None)
+ if isinstance(value, pywbem.CIMProperty):
+ return value.value
+ if value is not None:
+ return value
logger.log_error('missing key "{}" in inst.props'.format(k))
- return op.path[k] if k in op.path else None
- elif isinstance(op, pywbem.CIMInstanceName):
- _get_key = lambda k: op[k] if k in op else None
+ return objpath.path[k] if k in objpath.path else None
+ elif isinstance(objpath, pywbem.CIMInstanceName):
+ _get_key = lambda k: objpath[k] if k in objpath else None
else:
- raise TypeError("op must be either CIMInstance or CIMInstanceName")
+ raise TypeError("objpath must be either CIMInstance"
+ "or CIMInstanceName")
# parse and check arguments
match_props = {} # args for match_pkg
if _get_key('SoftwareElementID'):
- m = re_nevra.match(_get_key('SoftwareElementID'))
- if not m:
+ match = RE_NEVRA.match(_get_key('SoftwareElementID'))
+ if not match:
raise pywbem.CIMError(pywbem.CIM_ERR_INVALID_PARAMETER,
"SoftwareElementID could not be parsed.")
for k in ('name', 'version', 'release', 'arch'):
- mk = k if k not in ('version', 'release') else k[:3]
- match_props[k] = m.group(mk)
- if not m.group("epoch"):
+ mkey = k if k not in ('version', 'release') else k[:3]
+ match_props[k] = match.group(mkey)
+ if not match.group("epoch"):
match_props["epoch"] = "0"
else:
for k in ('name', 'epoch', 'version', 'release', 'arch'):
- ik = k if k != 'arch' else "architecture"
- if _get_key(ik): match_props[k] = _get_key(ik)
+ ikey = k if k != 'arch' else "architecture"
+ if _get_key(ikey):
+ match_props[k] = _get_key(ikey)
if not match_props:
raise pywbem.CIMError(pywbem.CIM_ERR_FAILED,
@@ -376,10 +352,10 @@ class SoftwarePackage:
"Missing either Name or SoftwareElementID property.")
# get available packages
- pl = YumDB.getInstance(env).doPackageLists('all', showdups=True)
+ pkglist = YumDB.getInstance(env).doPackageLists('all', showdups=True)
# NOTE: available ∩ installed = ∅
- exact,_,_ = yum.packages.parsePackages(
- itertools.chain(pl.available, pl.installed),
+ exact, _, _ = yum.packages.parsePackages(
+ itertools.chain(pkglist.available, pkglist.installed),
[match_props['name']])
exact = yum.misc.unique(exact)
exact_orig = exact
@@ -401,63 +377,57 @@ class SoftwarePackage:
return exact[-1] # select highest version
@staticmethod
- def pkg2model_wrapper(namespace, classname):
- """
- @return a function that transforms YumAvailablePackage object
- to CIMInstanceName object
- """
-
- tos = pywbem.Uint16(get_target_operating_system()[0])
-
- def pkg2model(env, pkg, keys_only=True, model=None):
- """
- @param model if None, will be filled with data, otherwise
- a new instance of CIMInstance or CIMObjectPath is created
- """
- #if not isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
- #if not isinstance(pkg, yum.packages.YumHeaderPackage):
- if not isinstance(pkg, yum.packages.YumAvailablePackage):
- raise TypeError(
- "pkg must be an instance of YumAvailablePackage")
- if model is None:
- model = pywbem.CIMInstanceName(classname, namespace=namespace)
- if not keys_only:
- model = pywbem.CIMInstance(classname, path=model)
- if isinstance(model, pywbem.CIMInstance):
- def _set_key(k, v):
- model[k] = v
- model.path[k] = v
- else:
- _set_key = model.__setitem__
- with YumDB.getInstance(env):
- _set_key('Name', pkg.name)
- _set_key('SoftwareElementID', pkg.nevra)
- _set_key('SoftwareElementState', pywbem.Uint16(2
- if isinstance(pkg, yum.rpmsack.RPMInstalledPackage)
- else 1))
- _set_key('TargetOperatingSystem', tos)
- _set_key('Version', pkg.version)
- if not keys_only:
- model['Caption'] = pkg.summary
- model['Description'] = pkg.description
- if isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
- model['InstallDate'] = pywbem.CIMDateTime(
- datetime.fromtimestamp(pkg.installtime))
- if pkg.vendor:
- model['Manufacturer'] = pkg.vendor
- model['Release'] = pkg.release
- model['Epoch'] = pywbem.Uint16(pkg.epoch)
- model["Architecture"] = pkg.arch
- model['License'] = pkg.license
- model['Group'] = pkg.group
- model['Size'] = pywbem.Uint64(pkg.size)
- return model
-
- return pkg2model
+ def pkg2model(env, pkg, keys_only=True, model=None):
+ """
+ @param model if None, will be filled with data, otherwise
+ a new instance of CIMInstance or CIMObjectPath is created
+ """
+ #if not isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
+ #if not isinstance(pkg, yum.packages.YumHeaderPackage):
+ if not isinstance(pkg, yum.packages.YumAvailablePackage):
+ raise TypeError(
+ "pkg must be an instance of YumAvailablePackage")
+ if model is None:
+ model = pywbem.CIMInstanceName('LMI_SoftwarePackage',
+ namespace='root/cimv2')
+ if not keys_only:
+ model = pywbem.CIMInstance('LMI_SoftwarePackage', path=model)
+ if isinstance(model, pywbem.CIMInstance):
+ def _set_key(k, value):
+ """Sets the value of key property of cim instance"""
+ model[k] = value
+ model.path[k] = value #pylint: disable=E1103
+ else:
+ _set_key = model.__setitem__
+ with YumDB.getInstance(env):
+ _set_key('Name', pkg.name)
+ _set_key('SoftwareElementID', pkg.nevra)
+ _set_key('SoftwareElementState', pywbem.Uint16(2
+ if isinstance(pkg, yum.rpmsack.RPMInstalledPackage)
+ else 1))
+ _set_key('TargetOperatingSystem',
+ pywbem.Uint16(get_target_operating_system()[0]))
+ _set_key('Version', pkg.version)
+ if not keys_only:
+ model['Caption'] = pkg.summary
+ model['Description'] = pkg.description
+ if isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
+ model['InstallDate'] = pywbem.CIMDateTime(
+ datetime.fromtimestamp(pkg.installtime))
+ if pkg.vendor:
+ model['Manufacturer'] = pkg.vendor
+ model['Release'] = pkg.release
+ model['Epoch'] = pywbem.Uint16(pkg.epoch)
+ model["Architecture"] = pkg.arch
+ model['License'] = pkg.license
+ model['Group'] = pkg.group
+ model['Size'] = pywbem.Uint64(pkg.size)
+ return model
class SoftwareFileCheck:
"""
Just a namespace for functions related to FileCheck provider.
+ TODO: make it a submodule
"""
passed_flags_descriptions = (
@@ -471,7 +441,7 @@ class SoftwareFileCheck:
"User Ownership", "Group Ownership",
"Modify Time")
- checksumtype_str2num = dict((v, k) for (k, v) in
+ checksumtype_str2num = dict((val, k) for (k, val) in
yum.constants.RPM_CHECKSUM_TYPES.items())
@staticmethod
@@ -483,23 +453,36 @@ class SoftwareFileCheck:
raise TypeError("pkg must be an instance of YumAvailablePackage")
if isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
return pkg.hdr[rpm.RPMTAG_FILEDIGESTALGO]
- with self: # ensure, that _yum is inited
+ with YumDB.getInstance(): # ensure, that _yum is inited
return SoftwareFileCheck.checksumtype_str2pywbem(
pkg.yumdb_info.checksum_type)
@staticmethod
def checksumtype_num2hash(csumt):
+ """
+ @param csumt checksum type as a number obtained from package
+ @return hash function object corresponding to csumt
+ """
return getattr(hashlib, yum.constants.RPM_CHECKSUM_TYPES[csumt])
@staticmethod
def checksumtype_str2pywbem(alg):
+ """
+ @param alg is a name of algorithm used for checksum
+ @return pywbem number corresponding to given alg
+ """
try:
res = SoftwareFileCheck.checksumtype_str2num[alg.lower()]
- except KeyError: res = 0
+ except KeyError:
+ res = 0
return pywbem.Uint16(res)
@staticmethod
- def filetype_str2pywbem(ft):
+ def filetype_str2pywbem(file_type):
+ """
+ @param file_type is a name of file type obtained from pkg headers
+ @return pywbem number corresponding to thus file type
+ """
try:
return pywbem.Uint16(
{ 'file' : 1
@@ -508,12 +491,16 @@ class SoftwareFileCheck:
, 'fifo' : 4
, 'character device' : 5
, 'block device' : 6
- }[ft])
+ }[file_type])
except KeyError:
return pywbem.Uint16(0)
@staticmethod
def filetype_mode2pywbem(mode):
+ """
+ @param mode is a raw file mode as integer
+ @return pywbem numeric value of file's type
+ """
for i, name in enumerate(
('REG', 'DIR', 'LNK', 'FIFO', 'CHR', 'BLK'), 1):
if getattr(stat, 'S_IS' + name)(mode):
@@ -524,10 +511,12 @@ class SoftwareFileCheck:
def mode2pywbem_flags(mode):
"""
@param mode if None, file does not exist
+ @return list of integer flags describing file's access permissions
"""
- if mode is None: return None
+ if mode is None:
+ return None
flags = []
- for i, c in enumerate((
+ for i, flag in enumerate((
stat.S_IXOTH,
stat.S_IWOTH,
stat.S_IROTH,
@@ -540,7 +529,7 @@ class SoftwareFileCheck:
stat.S_ISVTX,
stat.S_ISGID,
stat.S_ISUID)):
- if c & mode:
+ if flag & mode:
flags.append(pywbem.Uint8(i))
return flags
@@ -555,9 +544,10 @@ class SoftwareFileCheck:
hashers = (hashers, )
buf = afile.read(blocksize)
while len(buf) > 0:
- for h in hashers: h.update(buf)
+ for hashfunc in hashers:
+ hashfunc.update(buf)
buf = afile.read(blocksize)
- return [ h.hexdigest() for h in hashers ]
+ return [ hashfunc.hexdigest() for hashfunc in hashers ]
@staticmethod
def compute_checksums(env, checksum_type, file_type, file_path):
@@ -570,91 +560,86 @@ class SoftwareFileCheck:
first one is always md5, the second one depends on checksum_type
if file does not exists, (None, None) is returned
"""
- hashers = [hashlib.md5()]
+ hashers = [hashlib.md5()] #pylint: disable=E1101
if checksum_type != SoftwareFileCheck.checksumtype_str2num["md5"]:
- hashers.append(SoftwareFileCheck.checksumtype_num2hash(checksum_type)())
+ hashers.append(SoftwareFileCheck.checksumtype_num2hash(
+ checksum_type)())
if file_type != SoftwareFileCheck.filetype_str2pywbem('file'):
rslts = ['0'*len(h.hexdigest()) for h in hashers]
else:
try:
- with open(file_path, 'rb') as f:
- rslts = SoftwareFileCheck.hashfile(f, hashers)
- except (OSError, IOError) as e:
+ with open(file_path, 'rb') as fobj:
+ rslts = SoftwareFileCheck.hashfile(fobj, hashers)
+ except (OSError, IOError) as exc:
env.get_logger().log_error("could not open file \"%s\""
- " for reading: %s" % (file_path, e))
+ " for reading: %s" % (file_path, exc))
return None, None
return (rslts[0], rslts[1] if len(rslts) > 1 else rslts[0]*2)
@staticmethod
- def object_path2yumcheck(env, op):
+ def object_path2yumcheck(env, objpath):
"""
@return instance of yum.packages._RPMVerifyPackage
this object holds RPMInstalledPackage under its po attribute
"""
- if not isinstance(op, pywbem.CIMInstanceName):
- raise TypeError("op must be instance of CIMInstanceName, "
- "not \"%s\"" % op.__class__.__name__)
- log = env.get_logger()
-
- tos = get_target_operating_system()[0]
- if ( not op['Name'] or not op['SoftwareElementID']
- or not op['CheckID']
- or not op['CheckID'].endswith('#'+op['Name'])
- or op['SoftwareElementID'].find(op['Version']) == -1):
+ if not isinstance(objpath, pywbem.CIMInstanceName):
+ raise TypeError("objpath must be instance of CIMInstanceName, "
+ "not \"%s\"" % objpath.__class__.__name__)
+
+ if ( not objpath['Name'] or not objpath['SoftwareElementID']
+ or not objpath['CheckID']
+ or not objpath['CheckID'].endswith('#'+objpath['Name'])
+ or objpath['SoftwareElementID'].find(objpath['Version']) == -1):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong keys.")
- if op['SoftwareElementState'] not in ("2", 2):
+ if objpath['SoftwareElementState'] not in ("2", 2):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Only \"Executable\" software element state supported")
- if not check_target_operating_system(op['TargetOperatingSystem']):
+ if not check_target_operating_system(objpath['TargetOperatingSystem']):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Wrong target operating system.")
- if not op['Name'] or not op['Version']:
+ if not objpath['Name'] or not objpath['Version']:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
'Both "Name" and "Version" must be given')
- m = re_nevra.match(op['SoftwareElementID'])
- if not m:
+ match = RE_NEVRA.match(objpath['SoftwareElementID'])
+ if not match:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Wrong SotwareElementID. Expected valid nevra"
" (name-epoch:version-release.arch).")
- if op['Version'] != m.group('ver'):
+ if objpath['Version'] != match.group('ver'):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Version does not match version part in SoftwareElementID.")
evra = "{}:{}-{}.{}".format(*(
- (m.group(k) if k != "epoch" or m.group(k) else "0")
+ (match.group(k) if k != "epoch" or match.group(k) else "0")
for k in ("epoch", 'ver', 'rel', 'arch')))
- with YumDB.getInstance(env) as yb:
- pl = yb.doPackageLists('installed')
- exact, matched, unmatched = yum.packages.parsePackages(
- pl.installed, [m.group('name')])
- exact = yum.misc.unique(exact)
- for pkg in exact:
- if pkg.evra == evra: break
- else:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ with YumDB.getInstance(env) as ydb:
+ pkglist = ydb.doPackageLists('installed')
+ exact, _, _ = yum.packages.parsePackages(
+ pkglist.installed, [match.group('name')])
+ for pkg in yum.misc.unique(exact):
+ if pkg.evra != evra:
+ continue
+ vpkg = yum.packages._RPMVerifyPackage(
+ pkg, pkg.hdr.fiFromHeader(),
+ SoftwareFileCheck.pkg_checksum_type(pkg), [], True)
+ if not objpath['Name'] in vpkg:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "File not found in RPM package.")
+ return vpkg
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"No matching package installed.")
- fi = pkg.hdr.fiFromHeader()
- vpkg = yum.packages._RPMVerifyPackage(pkg, fi,
- SoftwareFileCheck.pkg_checksum_type(pkg), [], True)
- if not op['Name'] in vpkg:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "File not found in RPM package.")
- return vpkg
-
- """
- Named tuple to store results of rpm file check as pywbem values,
- all results are in form:
- (expected, reality)
- where
- expected is value from rpm package
- reality is value obtained from installed file
- None means, that value could not be obtained
- except for "exists" and "md5_checksum" attributes, where "exists"
- is boolean and md5_checksum is a string
- for example:
- file_check.file_type == (4, 3)
- """
+ # Named tuple to store results of rpm file check as pywbem values,
+ # all results are in form:
+ # (expected, reality)
+ # where
+ # expected is value from rpm package
+ # reality is value obtained from installed file
+ # None means, that value could not be obtained
+ # except for "exists" and "md5_checksum" attributes, where "exists"
+ # is boolean and md5_checksum is a string
+ # for example:
+ # file_check.file_type == (4, 3)
FileCheck = collections.namedtuple('FileCheck',
'exists, md5_checksum, file_type, file_size, file_mode, '
'file_checksum, device, link_target, user_id, group_id, '
@@ -688,7 +673,8 @@ class SoftwareFileCheck:
else:
fstat = os.lstat(vpf.filename)
reality = {
- "file_type" : SoftwareFileCheck.filetype_mode2pywbem(fstat.st_mode),
+ "file_type" : SoftwareFileCheck.filetype_mode2pywbem(
+ fstat.st_mode),
"user_id" : pywbem.Uint32(fstat.st_uid),
"group_id" : pywbem.Uint32(fstat.st_gid),
"file_mode" : pywbem.Uint32(fstat.st_mode),
@@ -696,8 +682,8 @@ class SoftwareFileCheck:
"last_modification_time" : pywbem.Uint64(fstat.st_mtime)
}
reality["device"] = (pywbem.Uint64(fstat.st_dev)
- if reality['file_type'] == SoftwareFileCheck.filetype_str2pywbem(
- "device") else None)
+ if reality['file_type'] ==
+ SoftwareFileCheck.filetype_str2pywbem("device") else None)
reality["link_target"] = (os.readlink(vpf.filename)
if os.path.islink(vpf.filename) else None)
md5_checksum, checksum = SoftwareFileCheck.compute_checksums(
@@ -708,102 +694,119 @@ class SoftwareFileCheck:
return SoftwareFileCheck.FileCheck(**kwargs)
@staticmethod
- def filecheck_passed(fc):
- if not isinstance(fc, SoftwareFileCheck.FileCheck):
- raise TypeError("fc must be an instance of FileCheck")
- return ( fc.exists
- and all( v[0] == v[1]
- for k, v in fc._asdict().items() if (
- isinstance(v, tuple)
- and ( k != "last_modification_time"
- or fc.file_type[0] == \
+ def filecheck_passed(file_check):
+ """
+ @return True if installed file passed all checks.
+ """
+ if not isinstance(file_check, SoftwareFileCheck.FileCheck):
+ raise TypeError("file_check must be an instance of FileCheck")
+ return ( file_check.exists
+ and all( val[0] == val[1]
+ #pylint: disable=W0212
+ for k, val in file_check._asdict().items()
+ if ( isinstance(val, tuple)
+ and ( k != "last_modification_time"
+ or file_check.file_type[0] == \
SoftwareFileCheck.filetype_str2pywbem("file")
- ))))
+ ))))
+
+ @staticmethod
+ def _filecheck2model_flags(file_check):
+ """
+ @param file_check is an instance of FileCheck
+ @return pywbem value for PassedFlags property
+ """
+ flags = []
+ for k, value in file_check._asdict().items(): #pylint: disable=W0212
+ if isinstance(value, tuple):
+ if ( k != "last_modification_time"
+ or file_check.file_type[0] ==
+ SoftwareFileCheck.filetype_str2pywbem('file')):
+ # last_modification_time check is valid only for
+ # regular files
+ flag = file_check.exists and value[0] == value[1]
+ else:
+ flag = True
+ flags.append(flag)
+ elif isinstance(value, bool):
+ flags.append(value)
+ return flags
@staticmethod
- def filecheck_wrapper(namespace, classname):
-
- def _filecheck2model_flags(fc):
- flags = []
- for k, v in fc._asdict().items():
- if isinstance(v, tuple):
- if ( k != "last_modification_time"
- or fc.file_type[0] == SoftwareFileCheck.filetype_str2pywbem(
- 'file')):
- # last_modification_time check is valid only for
- # regular files
- flag = fc.exists and v[0] == v[1]
- else:
- flag = True
- flags.append(flag)
- elif isinstance(v, bool):
- flags.append(v)
- return flags
-
- def filecheck2model(vpkg, fn, env, keys_only=True,
- model=None, fc=None):
- if not isinstance(vpkg, yum.packages._RPMVerifyPackage):
- raise TypeError(
- "vpkg must be an instance of _RPMVerifyPackage")
- if not fn in vpkg:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "File \"%s\" not found among package files" % fn)
- if model is None:
- model = pywbem.CIMInstanceName(classname, namespace=namespace)
- if not keys_only:
- model = pywbem.CIMInstance(classname, path=model)
- if fc is not None:
- if not isinstance(fc, SoftwareFileCheck.FileCheck):
- raise TypeError("fc must be an instance of FileCheck")
- pkg = vpkg.po
- vpf = vpkg._files[fn]
- model['Name'] = vpf.filename
- model['SoftwareElementID'] = pkg.nevra
- model['SoftwareElementState'] = pywbem.Uint16(2)
- model['TargetOperatingSystem'] = pywbem.Uint16(
- get_target_operating_system()[0])
- model['Version'] = pkg.version
- model['CheckID'] = '%s#%s' % (pkg.name, vpf.filename)
+ def _fill_non_key_values(env, model, pkg, vpf, file_check):
+ """
+ Fills a non key values into instance of SoftwareFileCheck.
+ """
+ model['FileName'] = os.path.basename(vpf.filename)
+ model['FileChecksumType'] = csumt = \
+ pywbem.Uint16(SoftwareFileCheck.pkg_checksum_type(pkg))
+ if file_check is None:
+ file_check = SoftwareFileCheck.test_file(env, csumt, vpf)
+ for mattr, fattr in (
+ ('FileType', 'file_type'),
+ ('FileUserID', 'user_id'),
+ ('FileGroupID', 'group_id'),
+ ('FileMode', 'file_mode'),
+ ('LastModificationTime', 'last_modification_time'),
+ ('FileSize', 'file_size'),
+ ('LinkTarget', 'link_target'),
+ ('FileChecksum', 'file_checksum')):
+ exp, rea = getattr(file_check, fattr)
+ if exp is not None:
+ model['Expected' + mattr] = exp
+ if rea is not None:
+ model[mattr] = rea
+ model['ExpectedFileModeFlags'] = \
+ SoftwareFileCheck.mode2pywbem_flags(file_check.file_mode[0])
+ if file_check.exists:
+ model['FileModeFlags'] = \
+ SoftwareFileCheck.mode2pywbem_flags(file_check.file_mode[1])
+ model['FileExists'] = file_check.exists
+ if file_check.md5_checksum is not None:
+ model['MD5Checksum'] = file_check.md5_checksum
+ model['PassedFlags'] = SoftwareFileCheck._filecheck2model_flags(
+ file_check)
+ model['PassedFlagsDescriptions'] = list(
+ SoftwareFileCheck.passed_flags_descriptions)
+
+ @staticmethod
+ def filecheck2model(vpkg, file_name, env, keys_only=True,
+ model=None, file_check=None):
+ """
+ @param vpkg is an instance of yum.packages_RPMVerifyPackage
+ @param file_name a absolute file path contained in package
+ @param keys_only if True, then only key values will be filed
+ @param model if given, then this instance will be modified and
+ returned
+ @param file_check if not given, it will be computed
+ @return instance of LMI_SoftwareFileCheck class with all desired
+ values filed
+ """
+ if not isinstance(vpkg, yum.packages._RPMVerifyPackage):
+ raise TypeError(
+ "vpkg must be an instance of _RPMVerifyPackage")
+ if not file_name in vpkg:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "File \"%s\" not found among package files" % file_name)
+ if model is None:
+ model = pywbem.CIMInstanceName("LMI_SoftwareFileCheck",
+ namespace="root/cimv2")
if not keys_only:
- #model['Caption'] = '' # TODO
- #model['CheckMode'] = bool() # TODO
- #model['CRC1'] = pywbem.Uint32() # TODO
- #model['CRC2'] = pywbem.Uint32() # TODO
- #model['CreateTimeStamp'] = pywbem.CIMDateTime() # TODO
- #model['Description'] = '' # TODO
- #model['ElementName'] = '' # TODO
- #model['InstanceID'] = '' # TODO
- model['FileName'] = os.path.basename(vpf.filename)
- model['FileChecksumType'] = csumt = \
- pywbem.Uint16(SoftwareFileCheck.pkg_checksum_type(pkg))
- if fc is None:
- fc = SoftwareFileCheck.test_file(env, csumt, vpf)
- for mattr, fattr in (
- ('FileType', 'file_type'),
- ('FileUserID', 'user_id'),
- ('FileGroupID', 'group_id'),
- ('FileMode', 'file_mode'),
- ('LastModificationTime', 'last_modification_time'),
- ('FileSize', 'file_size'),
- ('LinkTarget', 'link_target'),
- ('FileChecksum', 'file_checksum')):
- exp, rea = getattr(fc, fattr)
- if exp is not None:
- model['Expected' + mattr] = exp
- if rea is not None:
- model[mattr] = rea
- model['ExpectedFileModeFlags'] = \
- SoftwareFileCheck.mode2pywbem_flags(fc.file_mode[0])
- if fc.exists:
- model['FileModeFlags'] = SoftwareFileCheck.mode2pywbem_flags(
- fc.file_mode[1])
- model['FileExists'] = fc.exists
- if fc.md5_checksum is not None:
- model['MD5Checksum'] = fc.md5_checksum
- model['PassedFlags'] = _filecheck2model_flags(fc)
- model['PassedFlagsDescriptions'] = list(
- SoftwareFileCheck.passed_flags_descriptions)
- return model
-
- return filecheck2model
+ model = pywbem.CIMInstance("LMI_SoftwareFileCheck", path=model)
+ if file_check is not None:
+ if not isinstance(file_check, SoftwareFileCheck.FileCheck):
+ raise TypeError("file_check must be an instance of FileCheck")
+ pkg = vpkg.po
+ vpf = vpkg._files[file_name]
+ model['Name'] = vpf.filename
+ model['SoftwareElementID'] = pkg.nevra
+ model['SoftwareElementState'] = pywbem.Uint16(2)
+ model['TargetOperatingSystem'] = pywbem.Uint16(
+ get_target_operating_system()[0])
+ model['Version'] = pkg.version
+ model['CheckID'] = '%s#%s' % (pkg.name, vpf.filename)
+ if not keys_only:
+ SoftwareFileCheck._fill_non_key_values(
+ env, model, pkg, vpf, file_check)
+ return model
diff --git a/src/software/test/test_software_installed_package.py b/src/software/test/test_software_installed_package.py
index 92d19ac..54b8f35 100755
--- a/src/software/test/test_software_installed_package.py
+++ b/src/software/test/test_software_installed_package.py
@@ -27,7 +27,7 @@ import stat
class TestSoftwareInstalledPackage(SoftwareBaseTestCase):
CLASS_NAME = "LMI_SoftwareInstalledPackage"
- KEYS = ( "Software", "System")
+ KEYS = ("Software", "System")
def make_op(self, name, epoch, ver, rel, arch, ses=2):
op = self.op.copy()
@@ -41,9 +41,9 @@ class TestSoftwareInstalledPackage(SoftwareBaseTestCase):
"TargetOperatingSystem" : pywbem.Uint16(36),
"Version" : ver })
system_op = pywbem.CIMInstanceName(
- classname="CIM_ComputerSystem", namespace="root/cimv2",
+ classname="Linux_ComputerSystem", namespace="root/cimv2",
keybindings={
- "CreationClassName" : "CIM_ComputerSystem",
+ "CreationClassName" : "Linux_ComputerSystem",
"Name" : socket.gethostname() })
op["Software"] = pkg_op
op["System"] = system_op