summaryrefslogtreecommitdiffstats
path: root/src/software/openlmi/software/util/common.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/software/openlmi/software/util/common.py')
-rw-r--r--src/software/openlmi/software/util/common.py665
1 files changed, 334 insertions, 331 deletions
diff --git a/src/software/openlmi/software/util/common.py b/src/software/openlmi/software/util/common.py
index 7b54f1e..7deef32 100644
--- a/src/software/openlmi/software/util/common.py
+++ b/src/software/openlmi/software/util/common.py
@@ -40,15 +40,19 @@ import yum
import cmpi_pywbem_bindings as pycimmb
from openlmi.software.util import singletonmixin
-re_evra = re.compile(r'^(?P<epoch>\d+):(?P<ver>[^-]+)'
+RE_EVRA = re.compile(r'^(?P<epoch>\d+):(?P<ver>[^-]+)'
r'-(?P<rel>.+)\.(?P<arch>[^.]+)$')
-re_nevra = re.compile(r'^(?P<name>.+)-(?P<evra>((?P<epoch>\d+):)?(?P<ver>[^-]+)'
+RE_NEVRA = re.compile(r'^(?P<name>.+)-(?P<evra>((?P<epoch>\d+):)?(?P<ver>[^-]+)'
r'-(?P<rel>.+)\.(?P<arch>[^.]+))$')
-rpmdb_path = '/var/lib/rpm/Packages'
+RPMDB_PATH = '/var/lib/rpm/Packages'
class YumDB(singletonmixin.Singleton):
+ """
+ Context manager for accessing yum/rpm database.
+ """
+ # this is to inform Singleton, that __init__ should be called only once
ignoreSubsequent = True
def __init__(self, env, *args, **kwargs):
@@ -63,14 +67,23 @@ class YumDB(singletonmixin.Singleton):
env.get_logger().log_info('init called')
def is_dirty(self):
- return self._db_mtime < os.stat(rpmdb_path).st_mtime
+ """
+ @return True if rpm database has been modified since last update
+ """
+ return self._db_mtime < os.stat(RPMDB_PATH).st_mtime
def is_locked(self):
+ """
+ @return True if rpm database is locked
+ """
return self._yum._lockfile is not None
def update_db(self):
+ """
+ Call to update database metadata.
+ """
self.env.get_logger().log_info('updating rpmdb')
- self._db_mtime = os.stat(rpmdb_path).st_mtime
+ self._db_mtime = os.stat(RPMDB_PATH).st_mtime
self._yum.doConfigSetup()
self._yum.doTsSetup()
self._yum.doRpmDBSetup()
@@ -96,6 +109,9 @@ class YumDB(singletonmixin.Singleton):
def _get_distname():
+ """
+ @return name of linux distribution
+ """
if hasattr(platform, 'linux_distribution'):
return platform.linux_distribution(
full_distribution_name=False)[0].lower()
@@ -138,11 +154,14 @@ def get_target_operating_system():
else:
return (0, 'Unknown')
-def get_computer_system_op():
+def get_computer_system_op(prefix='Linux'):
+ """
+ @return object path of CIM_ComputerSystem for this system
+ """
return pywbem.CIMInstanceName(
- classname='CIM_ComputerSystem',
+ classname='%s_ComputerSystem' % prefix,
keybindings={
- "CreationClassName": "CIM_ComputerSystem"
+ "CreationClassName": "%s_ComputerSystem" % prefix
, "Name" : socket.gethostname() },
namespace="root/cimv2")
@@ -156,9 +175,11 @@ def check_target_operating_system(system):
raise TypeError("system must be either string or integer, not {}"
.format(system.__class__.__name__))
tos = get_target_operating_system()
- if system == tos: return True
+ if system == tos:
+ return True
if system == 36: # linux
- if platform.system().lower() == "linux": return True
+ if platform.system().lower() == "linux":
+ return True
if ( system >= 97 and system <= 100 # linux 2.x.x
and platform.uname()[2].startswith('2.4' if system < 99 else '2.6')
# check machine
@@ -168,12 +189,18 @@ def check_target_operating_system(system):
return False
def check_computer_system_op(env, system):
+ """
+ @param system is object path referring to CIM_ComputerSystem instance
+ passed as argument to some cim function
+ @return True if this instance matches our system; otherwise a CIMError
+ will be raised
+ """
if not isinstance(system, pywbem.CIMInstanceName):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"\"System\" must be a CIMInstanceName")
- our_system = get_computer_system_op()
- ch = env.get_cimom_handle()
- if not ch.is_subclass(system.namespace,
+ our_system = get_computer_system_op('CIM')
+ chandle = env.get_cimom_handle()
+ if not chandle.is_subclass(system.namespace,
sub=system.classname,
super=our_system.classname):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
@@ -182,7 +209,7 @@ def check_computer_system_op(env, system):
if not 'CreationClassName' in system or not 'Name' in system:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"\"System\" is missing one of keys")
- if not ch.is_subclass(system.namespace,
+ if not chandle.is_subclass(system.namespace,
sub=system['CreationClassName'],
super=our_system.classname):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
@@ -204,128 +231,72 @@ def match_pkg(pkg, **kwargs):
name, epoch, version, release, arch
evra, nevra
"""
- for a in ('evra', 'nevra', 'name', 'epoch', 'version', 'release', 'arch'):
- v = kwargs.get(a, None)
- if v and getattr(pkg, a) != v:
+ for attr in ( 'evra', 'nevra', 'name', 'epoch'
+ , 'version', 'release', 'arch'):
+ value = kwargs.get(attr, None)
+ if value and getattr(pkg, attr) != value:
return False
return True
class SoftwarePackage:
"""
Just a namespace for common function related to SoftwarePackage provider.
+ TODO: make it a submodule
"""
@staticmethod
- def parse_group(group, env):
- try:
- res = \
- { "amusements/games" : 3
- , "amusements/graphics" : 4
- , "applications/archiving" : 5
- , "applications/communications" : 6
- , "applications/databases" : 7
- , "applications/editors" : 8
- , "applications/emulators" : 9
- , "applications/engineering" : 10
- , "applications/file" : 11
- , "applications/internet" : 12
- , "applications/multimedia" : 13
- , "applications/productivity" : 14
- , "applications/publishing" : 15
- , "applications/system" : 16
- , "applications/text" : 17
- , "development/build tools" : 18
- , "development/debug" : 19
- , "development/debuggers" : 20
- , "development/documentation" : 21
- , "development/java" : 22
- , "development/languages" : 23
- , "development/libraries" : 24
- , "development/libraries/java" : 25
- , "development/system" : 26
- , "development/tools" : 27
- , "documentation" : 28
- , "internet/www/dynamic content" : 29
- , "system/libraries" : 30
- , "system environment/base" : 31
- , "system environment/daemons" : 32
- , "system environment/kernel" : 33
- , "system environment/libraries" : 34
- , "system environment/shells" : 35
- , "text processing/markup/xml" : 36
- , "user interface/desktops" : 37
- , "user interface/x" : 38
- , "user interface/x hardware support" : 39
- , "utilities" : 40
- }[group.lower()]
- except KeyError:
- logger = env.get_logger()
- if not group or group.lower() == "unspecified":
- logger.log_info("unspecified group '{}'".format(group))
- res = 2
- else:
- logger.log_error("failed to parse group '{}'".format(group))
- res = 0
- return pywbem.Uint16(res)
-
- @staticmethod
- def object_path2pkg(env, op, package_list='installed'):
+ def object_path2pkg(env, objpath, package_list='installed'):
"""
- @param op must contain precise information of package,
+ @param objpath must contain precise information of package,
otherwise a CIM_ERR_NOT_FOUND error is raised
@param package_list one of {'installed', 'all', 'available'}
says, where to look for given package
"""
- if not isinstance(op, pywbem.CIMInstanceName):
- raise TypeError("op must be an instance of CIMInstanceName")
+ if not isinstance(objpath, pywbem.CIMInstanceName):
+ raise TypeError("objpath must be an instance of CIMInstanceName")
if not isinstance(package_list, basestring):
raise TypeError("package_list must be a string")
if not package_list in ('installed', 'all', 'available'):
raise ValueError('unsupported package list "%s"'%package_list)
- tos = get_target_operating_system()[0]
- if ( not op['Name'] or not op['SoftwareElementID']
- or not op['SoftwareElementID'].startswith(op['Name'])
- or op['SoftwareElementID'].find(op['Version']) == -1):
+ if ( not objpath['Name'] or not objpath['SoftwareElementID']
+ or not objpath['SoftwareElementID'].startswith(objpath['Name'])
+ or objpath['SoftwareElementID'].find(objpath['Version']) == -1):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong keys.")
-# if op['SoftwareElementState'] not in ("2", 2):
-# raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
-# "Only \"Executable\" software element state supported")
- if not check_target_operating_system(op['TargetOperatingSystem']):
+ if not check_target_operating_system(objpath['TargetOperatingSystem']):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Wrong target operating system.")
- if not op['Name'] or not op['Version']:
+ if not objpath['Name'] or not objpath['Version']:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
'Both "Name" and "Version" must be given')
- m = re_nevra.match(op['SoftwareElementID'])
- if not m:
+ match = RE_NEVRA.match(objpath['SoftwareElementID'])
+ if not match:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Wrong SotwareElementID. Expected valid nevra"
" (name-[epoch:]version-release.arch).")
- if op['Version'] != m.group('ver'):
+ if objpath['Version'] != match.group('ver'):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Version does not match version part in SoftwareElementID.")
evra = "{}:{}-{}.{}".format(*(
- (m.group(k) if k != "epoch" or m.group(k) else "0")
+ (match.group(k) if k != "epoch" or match.group(k) else "0")
for k in ("epoch", 'ver', 'rel', 'arch')))
- with YumDB.getInstance(env) as yb:
- pl = yb.doPackageLists(package_list,
+ with YumDB.getInstance(env) as ydb:
+ pkglist = ydb.doPackageLists(package_list,
showdups=package_list != 'installed')
if package_list != 'all':
- pl = getattr(pl, package_list)
+ pkglist = getattr(pkglist, package_list)
else:
# NOTE: available ∩ installed = ∅
- pl = itertools.chain(pl.available, pl.installed)
- exact,_,_ = yum.packages.parsePackages(pl, [op['Name']])
+ pkglist = itertools.chain(pkglist.available, pkglist.installed)
+ exact, _, _ = yum.packages.parsePackages(pkglist, [objpath['Name']])
for pkg in yum.misc.unique(exact):
- if pkg.evra == evra: break
- else:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "No matching package found.")
- return pkg
+ if pkg.evra == evra:
+ return pkg
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "No matching package found.")
@staticmethod
- def object_path2pkg_search(env, op):
+ def object_path2pkg_search(env, objpath):
"""
similar to object_path2pkg, but tries to find best suitable
package matching keys
@@ -333,40 +304,45 @@ class SoftwarePackage:
If any matching package is already installed, it is returned.
Otherwise available package with highest version is returned.
- @param op may be object of CIMInstance or CIMInstanceName and
+ @param objpath may be object of CIMInstance or CIMInstanceName and
must contain at least \"Name\" or \"SoftwareElementID\"
@return instance of yum.rpmsack.RPMInstalledPackage in case of
installed package, otherwise yum.packages.YumAvailablePackage
"""
logger = env.get_logger()
- if isinstance(op, pywbem.CIMInstance):
+ if isinstance(objpath, pywbem.CIMInstance):
def _get_key(k):
- v = op.properties.get(k, None)
- if isinstance(v, pywbem.CIMProperty): return v.value
- if v is not None: return v
+ """@return value of instance's key"""
+ value = objpath.properties.get(k, None)
+ if isinstance(value, pywbem.CIMProperty):
+ return value.value
+ if value is not None:
+ return value
logger.log_error('missing key "{}" in inst.props'.format(k))
- return op.path[k] if k in op.path else None
- elif isinstance(op, pywbem.CIMInstanceName):
- _get_key = lambda k: op[k] if k in op else None
+ return objpath.path[k] if k in objpath.path else None
+ elif isinstance(objpath, pywbem.CIMInstanceName):
+ _get_key = lambda k: objpath[k] if k in objpath else None
else:
- raise TypeError("op must be either CIMInstance or CIMInstanceName")
+ raise TypeError("objpath must be either CIMInstance"
+ "or CIMInstanceName")
# parse and check arguments
match_props = {} # args for match_pkg
if _get_key('SoftwareElementID'):
- m = re_nevra.match(_get_key('SoftwareElementID'))
- if not m:
+ match = RE_NEVRA.match(_get_key('SoftwareElementID'))
+ if not match:
raise pywbem.CIMError(pywbem.CIM_ERR_INVALID_PARAMETER,
"SoftwareElementID could not be parsed.")
for k in ('name', 'version', 'release', 'arch'):
- mk = k if k not in ('version', 'release') else k[:3]
- match_props[k] = m.group(mk)
- if not m.group("epoch"):
+ mkey = k if k not in ('version', 'release') else k[:3]
+ match_props[k] = match.group(mkey)
+ if not match.group("epoch"):
match_props["epoch"] = "0"
else:
for k in ('name', 'epoch', 'version', 'release', 'arch'):
- ik = k if k != 'arch' else "architecture"
- if _get_key(ik): match_props[k] = _get_key(ik)
+ ikey = k if k != 'arch' else "architecture"
+ if _get_key(ikey):
+ match_props[k] = _get_key(ikey)
if not match_props:
raise pywbem.CIMError(pywbem.CIM_ERR_FAILED,
@@ -376,10 +352,10 @@ class SoftwarePackage:
"Missing either Name or SoftwareElementID property.")
# get available packages
- pl = YumDB.getInstance(env).doPackageLists('all', showdups=True)
+ pkglist = YumDB.getInstance(env).doPackageLists('all', showdups=True)
# NOTE: available ∩ installed = ∅
- exact,_,_ = yum.packages.parsePackages(
- itertools.chain(pl.available, pl.installed),
+ exact, _, _ = yum.packages.parsePackages(
+ itertools.chain(pkglist.available, pkglist.installed),
[match_props['name']])
exact = yum.misc.unique(exact)
exact_orig = exact
@@ -401,63 +377,57 @@ class SoftwarePackage:
return exact[-1] # select highest version
@staticmethod
- def pkg2model_wrapper(namespace, classname):
- """
- @return a function that transforms YumAvailablePackage object
- to CIMInstanceName object
- """
-
- tos = pywbem.Uint16(get_target_operating_system()[0])
-
- def pkg2model(env, pkg, keys_only=True, model=None):
- """
- @param model if None, will be filled with data, otherwise
- a new instance of CIMInstance or CIMObjectPath is created
- """
- #if not isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
- #if not isinstance(pkg, yum.packages.YumHeaderPackage):
- if not isinstance(pkg, yum.packages.YumAvailablePackage):
- raise TypeError(
- "pkg must be an instance of YumAvailablePackage")
- if model is None:
- model = pywbem.CIMInstanceName(classname, namespace=namespace)
- if not keys_only:
- model = pywbem.CIMInstance(classname, path=model)
- if isinstance(model, pywbem.CIMInstance):
- def _set_key(k, v):
- model[k] = v
- model.path[k] = v
- else:
- _set_key = model.__setitem__
- with YumDB.getInstance(env):
- _set_key('Name', pkg.name)
- _set_key('SoftwareElementID', pkg.nevra)
- _set_key('SoftwareElementState', pywbem.Uint16(2
- if isinstance(pkg, yum.rpmsack.RPMInstalledPackage)
- else 1))
- _set_key('TargetOperatingSystem', tos)
- _set_key('Version', pkg.version)
- if not keys_only:
- model['Caption'] = pkg.summary
- model['Description'] = pkg.description
- if isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
- model['InstallDate'] = pywbem.CIMDateTime(
- datetime.fromtimestamp(pkg.installtime))
- if pkg.vendor:
- model['Manufacturer'] = pkg.vendor
- model['Release'] = pkg.release
- model['Epoch'] = pywbem.Uint16(pkg.epoch)
- model["Architecture"] = pkg.arch
- model['License'] = pkg.license
- model['Group'] = pkg.group
- model['Size'] = pywbem.Uint64(pkg.size)
- return model
-
- return pkg2model
+ def pkg2model(env, pkg, keys_only=True, model=None):
+ """
+ @param model if None, will be filled with data, otherwise
+ a new instance of CIMInstance or CIMObjectPath is created
+ """
+ #if not isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
+ #if not isinstance(pkg, yum.packages.YumHeaderPackage):
+ if not isinstance(pkg, yum.packages.YumAvailablePackage):
+ raise TypeError(
+ "pkg must be an instance of YumAvailablePackage")
+ if model is None:
+ model = pywbem.CIMInstanceName('LMI_SoftwarePackage',
+ namespace='root/cimv2')
+ if not keys_only:
+ model = pywbem.CIMInstance('LMI_SoftwarePackage', path=model)
+ if isinstance(model, pywbem.CIMInstance):
+ def _set_key(k, value):
+ """Sets the value of key property of cim instance"""
+ model[k] = value
+ model.path[k] = value #pylint: disable=E1103
+ else:
+ _set_key = model.__setitem__
+ with YumDB.getInstance(env):
+ _set_key('Name', pkg.name)
+ _set_key('SoftwareElementID', pkg.nevra)
+ _set_key('SoftwareElementState', pywbem.Uint16(2
+ if isinstance(pkg, yum.rpmsack.RPMInstalledPackage)
+ else 1))
+ _set_key('TargetOperatingSystem',
+ pywbem.Uint16(get_target_operating_system()[0]))
+ _set_key('Version', pkg.version)
+ if not keys_only:
+ model['Caption'] = pkg.summary
+ model['Description'] = pkg.description
+ if isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
+ model['InstallDate'] = pywbem.CIMDateTime(
+ datetime.fromtimestamp(pkg.installtime))
+ if pkg.vendor:
+ model['Manufacturer'] = pkg.vendor
+ model['Release'] = pkg.release
+ model['Epoch'] = pywbem.Uint16(pkg.epoch)
+ model["Architecture"] = pkg.arch
+ model['License'] = pkg.license
+ model['Group'] = pkg.group
+ model['Size'] = pywbem.Uint64(pkg.size)
+ return model
class SoftwareFileCheck:
"""
Just a namespace for functions related to FileCheck provider.
+ TODO: make it a submodule
"""
passed_flags_descriptions = (
@@ -471,7 +441,7 @@ class SoftwareFileCheck:
"User Ownership", "Group Ownership",
"Modify Time")
- checksumtype_str2num = dict((v, k) for (k, v) in
+ checksumtype_str2num = dict((val, k) for (k, val) in
yum.constants.RPM_CHECKSUM_TYPES.items())
@staticmethod
@@ -483,23 +453,36 @@ class SoftwareFileCheck:
raise TypeError("pkg must be an instance of YumAvailablePackage")
if isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
return pkg.hdr[rpm.RPMTAG_FILEDIGESTALGO]
- with self: # ensure, that _yum is inited
+ with YumDB.getInstance(): # ensure, that _yum is inited
return SoftwareFileCheck.checksumtype_str2pywbem(
pkg.yumdb_info.checksum_type)
@staticmethod
def checksumtype_num2hash(csumt):
+ """
+ @param csumt checksum type as a number obtained from package
+ @return hash function object corresponding to csumt
+ """
return getattr(hashlib, yum.constants.RPM_CHECKSUM_TYPES[csumt])
@staticmethod
def checksumtype_str2pywbem(alg):
+ """
+ @param alg is a name of algorithm used for checksum
+ @return pywbem number corresponding to given alg
+ """
try:
res = SoftwareFileCheck.checksumtype_str2num[alg.lower()]
- except KeyError: res = 0
+ except KeyError:
+ res = 0
return pywbem.Uint16(res)
@staticmethod
- def filetype_str2pywbem(ft):
+ def filetype_str2pywbem(file_type):
+ """
+ @param file_type is a name of file type obtained from pkg headers
+ @return pywbem number corresponding to thus file type
+ """
try:
return pywbem.Uint16(
{ 'file' : 1
@@ -508,12 +491,16 @@ class SoftwareFileCheck:
, 'fifo' : 4
, 'character device' : 5
, 'block device' : 6
- }[ft])
+ }[file_type])
except KeyError:
return pywbem.Uint16(0)
@staticmethod
def filetype_mode2pywbem(mode):
+ """
+ @param mode is a raw file mode as integer
+ @return pywbem numeric value of file's type
+ """
for i, name in enumerate(
('REG', 'DIR', 'LNK', 'FIFO', 'CHR', 'BLK'), 1):
if getattr(stat, 'S_IS' + name)(mode):
@@ -524,10 +511,12 @@ class SoftwareFileCheck:
def mode2pywbem_flags(mode):
"""
@param mode if None, file does not exist
+ @return list of integer flags describing file's access permissions
"""
- if mode is None: return None
+ if mode is None:
+ return None
flags = []
- for i, c in enumerate((
+ for i, flag in enumerate((
stat.S_IXOTH,
stat.S_IWOTH,
stat.S_IROTH,
@@ -540,7 +529,7 @@ class SoftwareFileCheck:
stat.S_ISVTX,
stat.S_ISGID,
stat.S_ISUID)):
- if c & mode:
+ if flag & mode:
flags.append(pywbem.Uint8(i))
return flags
@@ -555,9 +544,10 @@ class SoftwareFileCheck:
hashers = (hashers, )
buf = afile.read(blocksize)
while len(buf) > 0:
- for h in hashers: h.update(buf)
+ for hashfunc in hashers:
+ hashfunc.update(buf)
buf = afile.read(blocksize)
- return [ h.hexdigest() for h in hashers ]
+ return [ hashfunc.hexdigest() for hashfunc in hashers ]
@staticmethod
def compute_checksums(env, checksum_type, file_type, file_path):
@@ -570,91 +560,86 @@ class SoftwareFileCheck:
first one is always md5, the second one depends on checksum_type
if file does not exists, (None, None) is returned
"""
- hashers = [hashlib.md5()]
+ hashers = [hashlib.md5()] #pylint: disable=E1101
if checksum_type != SoftwareFileCheck.checksumtype_str2num["md5"]:
- hashers.append(SoftwareFileCheck.checksumtype_num2hash(checksum_type)())
+ hashers.append(SoftwareFileCheck.checksumtype_num2hash(
+ checksum_type)())
if file_type != SoftwareFileCheck.filetype_str2pywbem('file'):
rslts = ['0'*len(h.hexdigest()) for h in hashers]
else:
try:
- with open(file_path, 'rb') as f:
- rslts = SoftwareFileCheck.hashfile(f, hashers)
- except (OSError, IOError) as e:
+ with open(file_path, 'rb') as fobj:
+ rslts = SoftwareFileCheck.hashfile(fobj, hashers)
+ except (OSError, IOError) as exc:
env.get_logger().log_error("could not open file \"%s\""
- " for reading: %s" % (file_path, e))
+ " for reading: %s" % (file_path, exc))
return None, None
return (rslts[0], rslts[1] if len(rslts) > 1 else rslts[0]*2)
@staticmethod
- def object_path2yumcheck(env, op):
+ def object_path2yumcheck(env, objpath):
"""
@return instance of yum.packages._RPMVerifyPackage
this object holds RPMInstalledPackage under its po attribute
"""
- if not isinstance(op, pywbem.CIMInstanceName):
- raise TypeError("op must be instance of CIMInstanceName, "
- "not \"%s\"" % op.__class__.__name__)
- log = env.get_logger()
-
- tos = get_target_operating_system()[0]
- if ( not op['Name'] or not op['SoftwareElementID']
- or not op['CheckID']
- or not op['CheckID'].endswith('#'+op['Name'])
- or op['SoftwareElementID'].find(op['Version']) == -1):
+ if not isinstance(objpath, pywbem.CIMInstanceName):
+ raise TypeError("objpath must be instance of CIMInstanceName, "
+ "not \"%s\"" % objpath.__class__.__name__)
+
+ if ( not objpath['Name'] or not objpath['SoftwareElementID']
+ or not objpath['CheckID']
+ or not objpath['CheckID'].endswith('#'+objpath['Name'])
+ or objpath['SoftwareElementID'].find(objpath['Version']) == -1):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong keys.")
- if op['SoftwareElementState'] not in ("2", 2):
+ if objpath['SoftwareElementState'] not in ("2", 2):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Only \"Executable\" software element state supported")
- if not check_target_operating_system(op['TargetOperatingSystem']):
+ if not check_target_operating_system(objpath['TargetOperatingSystem']):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Wrong target operating system.")
- if not op['Name'] or not op['Version']:
+ if not objpath['Name'] or not objpath['Version']:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
'Both "Name" and "Version" must be given')
- m = re_nevra.match(op['SoftwareElementID'])
- if not m:
+ match = RE_NEVRA.match(objpath['SoftwareElementID'])
+ if not match:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Wrong SotwareElementID. Expected valid nevra"
" (name-epoch:version-release.arch).")
- if op['Version'] != m.group('ver'):
+ if objpath['Version'] != match.group('ver'):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Version does not match version part in SoftwareElementID.")
evra = "{}:{}-{}.{}".format(*(
- (m.group(k) if k != "epoch" or m.group(k) else "0")
+ (match.group(k) if k != "epoch" or match.group(k) else "0")
for k in ("epoch", 'ver', 'rel', 'arch')))
- with YumDB.getInstance(env) as yb:
- pl = yb.doPackageLists('installed')
- exact, matched, unmatched = yum.packages.parsePackages(
- pl.installed, [m.group('name')])
- exact = yum.misc.unique(exact)
- for pkg in exact:
- if pkg.evra == evra: break
- else:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ with YumDB.getInstance(env) as ydb:
+ pkglist = ydb.doPackageLists('installed')
+ exact, _, _ = yum.packages.parsePackages(
+ pkglist.installed, [match.group('name')])
+ for pkg in yum.misc.unique(exact):
+ if pkg.evra != evra:
+ continue
+ vpkg = yum.packages._RPMVerifyPackage(
+ pkg, pkg.hdr.fiFromHeader(),
+ SoftwareFileCheck.pkg_checksum_type(pkg), [], True)
+ if not objpath['Name'] in vpkg:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "File not found in RPM package.")
+ return vpkg
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"No matching package installed.")
- fi = pkg.hdr.fiFromHeader()
- vpkg = yum.packages._RPMVerifyPackage(pkg, fi,
- SoftwareFileCheck.pkg_checksum_type(pkg), [], True)
- if not op['Name'] in vpkg:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "File not found in RPM package.")
- return vpkg
-
- """
- Named tuple to store results of rpm file check as pywbem values,
- all results are in form:
- (expected, reality)
- where
- expected is value from rpm package
- reality is value obtained from installed file
- None means, that value could not be obtained
- except for "exists" and "md5_checksum" attributes, where "exists"
- is boolean and md5_checksum is a string
- for example:
- file_check.file_type == (4, 3)
- """
+ # Named tuple to store results of rpm file check as pywbem values,
+ # all results are in form:
+ # (expected, reality)
+ # where
+ # expected is value from rpm package
+ # reality is value obtained from installed file
+ # None means, that value could not be obtained
+ # except for "exists" and "md5_checksum" attributes, where "exists"
+ # is boolean and md5_checksum is a string
+ # for example:
+ # file_check.file_type == (4, 3)
FileCheck = collections.namedtuple('FileCheck',
'exists, md5_checksum, file_type, file_size, file_mode, '
'file_checksum, device, link_target, user_id, group_id, '
@@ -688,7 +673,8 @@ class SoftwareFileCheck:
else:
fstat = os.lstat(vpf.filename)
reality = {
- "file_type" : SoftwareFileCheck.filetype_mode2pywbem(fstat.st_mode),
+ "file_type" : SoftwareFileCheck.filetype_mode2pywbem(
+ fstat.st_mode),
"user_id" : pywbem.Uint32(fstat.st_uid),
"group_id" : pywbem.Uint32(fstat.st_gid),
"file_mode" : pywbem.Uint32(fstat.st_mode),
@@ -696,8 +682,8 @@ class SoftwareFileCheck:
"last_modification_time" : pywbem.Uint64(fstat.st_mtime)
}
reality["device"] = (pywbem.Uint64(fstat.st_dev)
- if reality['file_type'] == SoftwareFileCheck.filetype_str2pywbem(
- "device") else None)
+ if reality['file_type'] ==
+ SoftwareFileCheck.filetype_str2pywbem("device") else None)
reality["link_target"] = (os.readlink(vpf.filename)
if os.path.islink(vpf.filename) else None)
md5_checksum, checksum = SoftwareFileCheck.compute_checksums(
@@ -708,102 +694,119 @@ class SoftwareFileCheck:
return SoftwareFileCheck.FileCheck(**kwargs)
@staticmethod
- def filecheck_passed(fc):
- if not isinstance(fc, SoftwareFileCheck.FileCheck):
- raise TypeError("fc must be an instance of FileCheck")
- return ( fc.exists
- and all( v[0] == v[1]
- for k, v in fc._asdict().items() if (
- isinstance(v, tuple)
- and ( k != "last_modification_time"
- or fc.file_type[0] == \
+ def filecheck_passed(file_check):
+ """
+ @return True if installed file passed all checks.
+ """
+ if not isinstance(file_check, SoftwareFileCheck.FileCheck):
+ raise TypeError("file_check must be an instance of FileCheck")
+ return ( file_check.exists
+ and all( val[0] == val[1]
+ #pylint: disable=W0212
+ for k, val in file_check._asdict().items()
+ if ( isinstance(val, tuple)
+ and ( k != "last_modification_time"
+ or file_check.file_type[0] == \
SoftwareFileCheck.filetype_str2pywbem("file")
- ))))
+ ))))
+
+ @staticmethod
+ def _filecheck2model_flags(file_check):
+ """
+ @param file_check is an instance of FileCheck
+ @return pywbem value for PassedFlags property
+ """
+ flags = []
+ for k, value in file_check._asdict().items(): #pylint: disable=W0212
+ if isinstance(value, tuple):
+ if ( k != "last_modification_time"
+ or file_check.file_type[0] ==
+ SoftwareFileCheck.filetype_str2pywbem('file')):
+ # last_modification_time check is valid only for
+ # regular files
+ flag = file_check.exists and value[0] == value[1]
+ else:
+ flag = True
+ flags.append(flag)
+ elif isinstance(value, bool):
+ flags.append(value)
+ return flags
@staticmethod
- def filecheck_wrapper(namespace, classname):
-
- def _filecheck2model_flags(fc):
- flags = []
- for k, v in fc._asdict().items():
- if isinstance(v, tuple):
- if ( k != "last_modification_time"
- or fc.file_type[0] == SoftwareFileCheck.filetype_str2pywbem(
- 'file')):
- # last_modification_time check is valid only for
- # regular files
- flag = fc.exists and v[0] == v[1]
- else:
- flag = True
- flags.append(flag)
- elif isinstance(v, bool):
- flags.append(v)
- return flags
-
- def filecheck2model(vpkg, fn, env, keys_only=True,
- model=None, fc=None):
- if not isinstance(vpkg, yum.packages._RPMVerifyPackage):
- raise TypeError(
- "vpkg must be an instance of _RPMVerifyPackage")
- if not fn in vpkg:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "File \"%s\" not found among package files" % fn)
- if model is None:
- model = pywbem.CIMInstanceName(classname, namespace=namespace)
- if not keys_only:
- model = pywbem.CIMInstance(classname, path=model)
- if fc is not None:
- if not isinstance(fc, SoftwareFileCheck.FileCheck):
- raise TypeError("fc must be an instance of FileCheck")
- pkg = vpkg.po
- vpf = vpkg._files[fn]
- model['Name'] = vpf.filename
- model['SoftwareElementID'] = pkg.nevra
- model['SoftwareElementState'] = pywbem.Uint16(2)
- model['TargetOperatingSystem'] = pywbem.Uint16(
- get_target_operating_system()[0])
- model['Version'] = pkg.version
- model['CheckID'] = '%s#%s' % (pkg.name, vpf.filename)
+ def _fill_non_key_values(env, model, pkg, vpf, file_check):
+ """
+ Fills a non key values into instance of SoftwareFileCheck.
+ """
+ model['FileName'] = os.path.basename(vpf.filename)
+ model['FileChecksumType'] = csumt = \
+ pywbem.Uint16(SoftwareFileCheck.pkg_checksum_type(pkg))
+ if file_check is None:
+ file_check = SoftwareFileCheck.test_file(env, csumt, vpf)
+ for mattr, fattr in (
+ ('FileType', 'file_type'),
+ ('FileUserID', 'user_id'),
+ ('FileGroupID', 'group_id'),
+ ('FileMode', 'file_mode'),
+ ('LastModificationTime', 'last_modification_time'),
+ ('FileSize', 'file_size'),
+ ('LinkTarget', 'link_target'),
+ ('FileChecksum', 'file_checksum')):
+ exp, rea = getattr(file_check, fattr)
+ if exp is not None:
+ model['Expected' + mattr] = exp
+ if rea is not None:
+ model[mattr] = rea
+ model['ExpectedFileModeFlags'] = \
+ SoftwareFileCheck.mode2pywbem_flags(file_check.file_mode[0])
+ if file_check.exists:
+ model['FileModeFlags'] = \
+ SoftwareFileCheck.mode2pywbem_flags(file_check.file_mode[1])
+ model['FileExists'] = file_check.exists
+ if file_check.md5_checksum is not None:
+ model['MD5Checksum'] = file_check.md5_checksum
+ model['PassedFlags'] = SoftwareFileCheck._filecheck2model_flags(
+ file_check)
+ model['PassedFlagsDescriptions'] = list(
+ SoftwareFileCheck.passed_flags_descriptions)
+
+ @staticmethod
+ def filecheck2model(vpkg, file_name, env, keys_only=True,
+ model=None, file_check=None):
+ """
+ @param vpkg is an instance of yum.packages_RPMVerifyPackage
+ @param file_name a absolute file path contained in package
+ @param keys_only if True, then only key values will be filed
+ @param model if given, then this instance will be modified and
+ returned
+ @param file_check if not given, it will be computed
+ @return instance of LMI_SoftwareFileCheck class with all desired
+ values filed
+ """
+ if not isinstance(vpkg, yum.packages._RPMVerifyPackage):
+ raise TypeError(
+ "vpkg must be an instance of _RPMVerifyPackage")
+ if not file_name in vpkg:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "File \"%s\" not found among package files" % file_name)
+ if model is None:
+ model = pywbem.CIMInstanceName("LMI_SoftwareFileCheck",
+ namespace="root/cimv2")
if not keys_only:
- #model['Caption'] = '' # TODO
- #model['CheckMode'] = bool() # TODO
- #model['CRC1'] = pywbem.Uint32() # TODO
- #model['CRC2'] = pywbem.Uint32() # TODO
- #model['CreateTimeStamp'] = pywbem.CIMDateTime() # TODO
- #model['Description'] = '' # TODO
- #model['ElementName'] = '' # TODO
- #model['InstanceID'] = '' # TODO
- model['FileName'] = os.path.basename(vpf.filename)
- model['FileChecksumType'] = csumt = \
- pywbem.Uint16(SoftwareFileCheck.pkg_checksum_type(pkg))
- if fc is None:
- fc = SoftwareFileCheck.test_file(env, csumt, vpf)
- for mattr, fattr in (
- ('FileType', 'file_type'),
- ('FileUserID', 'user_id'),
- ('FileGroupID', 'group_id'),
- ('FileMode', 'file_mode'),
- ('LastModificationTime', 'last_modification_time'),
- ('FileSize', 'file_size'),
- ('LinkTarget', 'link_target'),
- ('FileChecksum', 'file_checksum')):
- exp, rea = getattr(fc, fattr)
- if exp is not None:
- model['Expected' + mattr] = exp
- if rea is not None:
- model[mattr] = rea
- model['ExpectedFileModeFlags'] = \
- SoftwareFileCheck.mode2pywbem_flags(fc.file_mode[0])
- if fc.exists:
- model['FileModeFlags'] = SoftwareFileCheck.mode2pywbem_flags(
- fc.file_mode[1])
- model['FileExists'] = fc.exists
- if fc.md5_checksum is not None:
- model['MD5Checksum'] = fc.md5_checksum
- model['PassedFlags'] = _filecheck2model_flags(fc)
- model['PassedFlagsDescriptions'] = list(
- SoftwareFileCheck.passed_flags_descriptions)
- return model
-
- return filecheck2model
+ model = pywbem.CIMInstance("LMI_SoftwareFileCheck", path=model)
+ if file_check is not None:
+ if not isinstance(file_check, SoftwareFileCheck.FileCheck):
+ raise TypeError("file_check must be an instance of FileCheck")
+ pkg = vpkg.po
+ vpf = vpkg._files[file_name]
+ model['Name'] = vpf.filename
+ model['SoftwareElementID'] = pkg.nevra
+ model['SoftwareElementState'] = pywbem.Uint16(2)
+ model['TargetOperatingSystem'] = pywbem.Uint16(
+ get_target_operating_system()[0])
+ model['Version'] = pkg.version
+ model['CheckID'] = '%s#%s' % (pkg.name, vpf.filename)
+ if not keys_only:
+ SoftwareFileCheck._fill_non_key_values(
+ env, model, pkg, vpf, file_check)
+ return model