# -*- encoding: utf-8 -*-
# Software Management Providers
#
# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
"""Common utilities for LMI_Software* providers
"""
import collections
from datetime import datetime
import grp
import hashlib
import itertools
import os
import platform
import pwd
import re
import rpm
import socket
import stat
import pywbem
import yum
import cmpi_pywbem_bindings as pycimmb
from openlmi.software.util import singletonmixin
re_evra = re.compile(r'^(?P\d+):(?P[^-]+)'
r'-(?P.+)\.(?P[^.]+)$')
re_nevra = re.compile(r'^(?P.+)-(?P((?P\d+):)?(?P[^-]+)'
r'-(?P.+)\.(?P[^.]+))$')
rpmdb_path = '/var/lib/rpm/Packages'
class YumDB(singletonmixin.Singleton):
ignoreSubsequent = True
def __init__(self, env, *args, **kwargs):
if not isinstance(env, pycimmb.ProviderEnvironment):
raise TypeError("env must be instance of"
" pycimmb.ProviderEnvironment")
self._yum_args = (args, kwargs)
self._yum = None
self._db_mtime = 0
self._lock_cnt = 0
self.env = env
env.get_logger().log_info('init called')
def is_dirty(self):
return self._db_mtime < os.stat(rpmdb_path).st_mtime
def is_locked(self):
return self._yum._lockfile is not None
def update_db(self):
self.env.get_logger().log_info('updating rpmdb')
self._db_mtime = os.stat(rpmdb_path).st_mtime
self._yum.doConfigSetup()
self._yum.doTsSetup()
self._yum.doRpmDBSetup()
def __enter__(self):
self._lock_cnt += 1
if self._lock_cnt < 2:
self._yum = yum.YumBase(*self._yum_args[0], **self._yum_args[1])
if not self.is_locked() and self.is_dirty():
self.update_db()
self._yum.doLock()
return self
def __exit__(self, exc_type, exc_value, traceback):
if self._lock_cnt == 1:
del self._yum
self._lock_cnt -= 1
def __getattr__(self, name):
if not self.is_locked() and self.is_dirty():
self.update_db()
return getattr(self._yum, name)
def _get_distname():
if hasattr(platform, 'linux_distribution'):
return platform.linux_distribution(
full_distribution_name=False)[0].lower()
else:
return platform.dist()[0].lower()
def get_target_operating_system():
"""
@return (val, text).
Where val is a number from ValueMap of TargetOperatingSystem property
of CIM_SoftwareElement class and text is its testual representation.
"""
system = platform.system()
if system.lower() == 'linux':
try:
val, dist = \
{ 'redhat' : (79, 'RedHat Enterprise Linux')
, 'suse' : (81, 'SUSE')
, 'mandriva' : (88, 'Mandriva')
, 'ubuntu' : (93, 'Ubuntu')
, 'debian' : (95, 'Debian')
}[_get_distname()]
except KeyError:
linrel = platform.uname()[2]
if linrel.startswith('2.4'):
val, dist = (97, 'Linux 2.4.x')
elif linrel.startswith('2.6'):
val, dist = (99, 'Linux 2.6.x')
else:
return (36, 'LINUX') # no check for x86_64
if platform.machine() == 'x86_64':
val += 1
dist += ' 64-Bit'
return (val, dist)
elif system.lower() in ('macosx', 'darwin'):
return (2, 'MACOS')
# elif system.lower() == 'windows': # no general value
else:
return (0, 'Unknown')
def get_computer_system_op():
return pywbem.CIMInstanceName(
classname='CIM_ComputerSystem',
keybindings={
"CreationClassName": "CIM_ComputerSystem"
, "Name" : socket.gethostname() },
namespace="root/cimv2")
def check_target_operating_system(system):
"""
@return if param system matches current target operating system
"""
if isinstance(system, basestring):
system = int(system)
if not isinstance(system, (int, long)):
raise TypeError("system must be either string or integer, not {}"
.format(system.__class__.__name__))
tos = get_target_operating_system()
if system == tos: return True
if system == 36: # linux
if platform.system().lower() == "linux": return True
if ( system >= 97 and system <= 100 # linux 2.x.x
and platform.uname()[2].startswith('2.4' if system < 99 else '2.6')
# check machine
and ( bool(platform.machine().endswith('64'))
== bool(not (system % 2)))):
return True
return False
def check_computer_system_op(env, system):
if not isinstance(system, pywbem.CIMInstanceName):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"\"System\" must be a CIMInstanceName")
our_system = get_computer_system_op()
ch = env.get_cimom_handle()
if not ch.is_subclass(system.namespace,
sub=system.classname,
super=our_system.classname):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Class of \"System\" must be a sublass of %s" %
our_system.classname)
if not 'CreationClassName' in system or not 'Name' in system:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"\"System\" is missing one of keys")
if not ch.is_subclass(system.namespace,
sub=system['CreationClassName'],
super=our_system.classname):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"CreationClassName of \"System\" must be a sublass of %s" %
our_system.classname)
if system['Name'] != our_system['Name']:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Name of \"System\" does not match \"%s\"" %
our_system['Name'])
return True
def match_pkg(pkg, **kwargs):
"""
all not Null and not empty arguments will be matched against pkg
attributes; if all of them match, return True
possible arguments:
name, epoch, version, release, arch
evra, nevra
"""
for a in ('evra', 'nevra', 'name', 'epoch', 'version', 'release', 'arch'):
v = kwargs.get(a, None)
if v and getattr(pkg, a) != v:
return False
return True
class SoftwarePackage:
"""
Just a namespace for common function related to SoftwarePackage provider.
"""
@staticmethod
def parse_group(group, env):
try:
res = \
{ "amusements/games" : 3
, "amusements/graphics" : 4
, "applications/archiving" : 5
, "applications/communications" : 6
, "applications/databases" : 7
, "applications/editors" : 8
, "applications/emulators" : 9
, "applications/engineering" : 10
, "applications/file" : 11
, "applications/internet" : 12
, "applications/multimedia" : 13
, "applications/productivity" : 14
, "applications/publishing" : 15
, "applications/system" : 16
, "applications/text" : 17
, "development/build tools" : 18
, "development/debug" : 19
, "development/debuggers" : 20
, "development/documentation" : 21
, "development/java" : 22
, "development/languages" : 23
, "development/libraries" : 24
, "development/libraries/java" : 25
, "development/system" : 26
, "development/tools" : 27
, "documentation" : 28
, "internet/www/dynamic content" : 29
, "system/libraries" : 30
, "system environment/base" : 31
, "system environment/daemons" : 32
, "system environment/kernel" : 33
, "system environment/libraries" : 34
, "system environment/shells" : 35
, "text processing/markup/xml" : 36
, "user interface/desktops" : 37
, "user interface/x" : 38
, "user interface/x hardware support" : 39
, "utilities" : 40
}[group.lower()]
except KeyError:
logger = env.get_logger()
if not group or group.lower() == "unspecified":
logger.log_info("unspecified group '{}'".format(group))
res = 2
else:
logger.log_error("failed to parse group '{}'".format(group))
res = 0
return pywbem.Uint16(res)
@staticmethod
def object_path2pkg(env, op, package_list='installed'):
"""
@param op must contain precise information of package,
otherwise a CIM_ERR_NOT_FOUND error is raised
@param package_list one of {'installed', 'all', 'available'}
says, where to look for given package
"""
if not isinstance(op, pywbem.CIMInstanceName):
raise TypeError("op must be an instance of CIMInstanceName")
if not isinstance(package_list, basestring):
raise TypeError("package_list must be a string")
if not package_list in ('installed', 'all', 'available'):
raise ValueError('unsupported package list "%s"'%package_list)
tos = get_target_operating_system()[0]
if ( not op['Name'] or not op['SoftwareElementID']
or not op['SoftwareElementID'].startswith(op['Name'])
or op['SoftwareElementID'].find(op['Version']) == -1):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong keys.")
# if op['SoftwareElementState'] not in ("2", 2):
# raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
# "Only \"Executable\" software element state supported")
if not check_target_operating_system(op['TargetOperatingSystem']):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Wrong target operating system.")
if not op['Name'] or not op['Version']:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
'Both "Name" and "Version" must be given')
m = re_nevra.match(op['SoftwareElementID'])
if not m:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Wrong SotwareElementID. Expected valid nevra"
" (name-[epoch:]version-release.arch).")
if op['Version'] != m.group('ver'):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Version does not match version part in SoftwareElementID.")
evra = "{}:{}-{}.{}".format(*(
(m.group(k) if k != "epoch" or m.group(k) else "0")
for k in ("epoch", 'ver', 'rel', 'arch')))
with YumDB.getInstance(env) as yb:
pl = yb.doPackageLists(package_list,
showdups=package_list != 'installed')
if package_list != 'all':
pl = getattr(pl, package_list)
else:
# NOTE: available ∩ installed = ∅
pl = itertools.chain(pl.available, pl.installed)
exact,_,_ = yum.packages.parsePackages(pl, [op['Name']])
for pkg in yum.misc.unique(exact):
if pkg.evra == evra: break
else:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"No matching package found.")
return pkg
@staticmethod
def object_path2pkg_search(env, op):
"""
similar to object_path2pkg, but tries to find best suitable
package matching keys
If any matching package is already installed, it is returned.
Otherwise available package with highest version is returned.
@param op may be object of CIMInstance or CIMInstanceName and
must contain at least \"Name\" or \"SoftwareElementID\"
@return instance of yum.rpmsack.RPMInstalledPackage in case of
installed package, otherwise yum.packages.YumAvailablePackage
"""
logger = env.get_logger()
if isinstance(op, pywbem.CIMInstance):
def _get_key(k):
v = op.properties.get(k, None)
if isinstance(v, pywbem.CIMProperty): return v.value
if v is not None: return v
logger.log_error('missing key "{}" in inst.props'.format(k))
return op.path[k] if k in op.path else None
elif isinstance(op, pywbem.CIMInstanceName):
_get_key = lambda k: op[k] if k in op else None
else:
raise TypeError("op must be either CIMInstance or CIMInstanceName")
# parse and check arguments
match_props = {} # args for match_pkg
if _get_key('SoftwareElementID'):
m = re_nevra.match(_get_key('SoftwareElementID'))
if not m:
raise pywbem.CIMError(pywbem.CIM_ERR_INVALID_PARAMETER,
"SoftwareElementID could not be parsed.")
for k in ('name', 'version', 'release', 'arch'):
mk = k if k not in ('version', 'release') else k[:3]
match_props[k] = m.group(mk)
if not m.group("epoch"):
match_props["epoch"] = "0"
else:
for k in ('name', 'epoch', 'version', 'release', 'arch'):
ik = k if k != 'arch' else "architecture"
if _get_key(ik): match_props[k] = _get_key(ik)
if not match_props:
raise pywbem.CIMError(pywbem.CIM_ERR_FAILED,
"Too few key values given (give at least a Name).")
if not 'name' in match_props:
raise pywbem.CIMError(pywbem.CIM_ERR_FAILED,
"Missing either Name or SoftwareElementID property.")
# get available packages
pl = YumDB.getInstance(env).doPackageLists('all', showdups=True)
# NOTE: available ∩ installed = ∅
exact,_,_ = yum.packages.parsePackages(
itertools.chain(pl.available, pl.installed),
[match_props['name']])
exact = yum.misc.unique(exact)
exact_orig = exact
exact = sorted( [ p for p in exact if match_pkg(p, **match_props) ]
, key=lambda a: a.evra)
if len(exact) == 0:
logger.log_error('could not find any package for query: {}'
' in list: {}'
.format(match_props, [p.nevra for p in exact_orig]))
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"No matching package found.")
for pkg in exact: # check, whether package is already installed
if isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
return pkg
logger.log_info(('found multiple matching packages'
' for query: {}' if len(exact) > 1 else
'exact match found for query: {}')
.format(match_props))
return exact[-1] # select highest version
@staticmethod
def pkg2model_wrapper(namespace, classname):
"""
@return a function that transforms YumAvailablePackage object
to CIMInstanceName object
"""
tos = pywbem.Uint16(get_target_operating_system()[0])
def pkg2model(env, pkg, keys_only=True, model=None):
"""
@param model if None, will be filled with data, otherwise
a new instance of CIMInstance or CIMObjectPath is created
"""
#if not isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
#if not isinstance(pkg, yum.packages.YumHeaderPackage):
if not isinstance(pkg, yum.packages.YumAvailablePackage):
raise TypeError(
"pkg must be an instance of YumAvailablePackage")
if model is None:
model = pywbem.CIMInstanceName(classname, namespace=namespace)
if not keys_only:
model = pywbem.CIMInstance(classname, path=model)
if isinstance(model, pywbem.CIMInstance):
def _set_key(k, v):
model[k] = v
model.path[k] = v
else:
_set_key = model.__setitem__
with YumDB.getInstance(env):
_set_key('Name', pkg.name)
_set_key('SoftwareElementID', pkg.nevra)
_set_key('SoftwareElementState', pywbem.Uint16(2
if isinstance(pkg, yum.rpmsack.RPMInstalledPackage)
else 1))
_set_key('TargetOperatingSystem', tos)
_set_key('Version', pkg.version)
if not keys_only:
model['Caption'] = pkg.summary
model['Description'] = pkg.description
if isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
model['InstallDate'] = pywbem.CIMDateTime(
datetime.fromtimestamp(pkg.installtime))
if pkg.vendor:
model['Manufacturer'] = pkg.vendor
model['Release'] = pkg.release
model['Epoch'] = pywbem.Uint16(pkg.epoch)
model["Architecture"] = pkg.arch
model['License'] = pkg.license
model['Group'] = pkg.group
model['Size'] = pywbem.Uint64(pkg.size)
return model
return pkg2model
class SoftwareFileCheck:
"""
Just a namespace for functions related to FileCheck provider.
"""
passed_flags_descriptions = (
"Existence",
"File Type",
"File Size",
"File Mode",
"File Checksum",
"Device major/minor number",
"Symlink Target",
"User Ownership", "Group Ownership",
"Modify Time")
checksumtype_str2num = dict((v, k) for (k, v) in
yum.constants.RPM_CHECKSUM_TYPES.items())
@staticmethod
def pkg_checksum_type(pkg):
"""
@return integer representation of checksum type
"""
if not isinstance(pkg, yum.packages.YumAvailablePackage):
raise TypeError("pkg must be an instance of YumAvailablePackage")
if isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
return pkg.hdr[rpm.RPMTAG_FILEDIGESTALGO]
with self: # ensure, that _yum is inited
return SoftwareFileCheck.checksumtype_str2pywbem(
pkg.yumdb_info.checksum_type)
@staticmethod
def checksumtype_num2hash(csumt):
return getattr(hashlib, yum.constants.RPM_CHECKSUM_TYPES[csumt])
@staticmethod
def checksumtype_str2pywbem(alg):
try:
res = SoftwareFileCheck.checksumtype_str2num[alg.lower()]
except KeyError: res = 0
return pywbem.Uint16(res)
@staticmethod
def filetype_str2pywbem(ft):
try:
return pywbem.Uint16(
{ 'file' : 1
, 'directory' : 2
, 'symlink' : 3
, 'fifo' : 4
, 'character device' : 5
, 'block device' : 6
}[ft])
except KeyError:
return pywbem.Uint16(0)
@staticmethod
def filetype_mode2pywbem(mode):
for i, name in enumerate(
('REG', 'DIR', 'LNK', 'FIFO', 'CHR', 'BLK'), 1):
if getattr(stat, 'S_IS' + name)(mode):
return pywbem.Uint16(i)
return pywbem.Uint16(0)
@staticmethod
def mode2pywbem_flags(mode):
"""
@param mode if None, file does not exist
"""
if mode is None: return None
flags = []
for i, c in enumerate((
stat.S_IXOTH,
stat.S_IWOTH,
stat.S_IROTH,
stat.S_IXGRP,
stat.S_IWGRP,
stat.S_IRGRP,
stat.S_IXUSR,
stat.S_IWUSR,
stat.S_IRUSR,
stat.S_ISVTX,
stat.S_ISGID,
stat.S_ISUID)):
if c & mode:
flags.append(pywbem.Uint8(i))
return flags
@staticmethod
def hashfile(afile, hashers, blocksize=65536):
"""
@param hashers is a list of hash objects
@return list of digest strings (in hex format) for each hash object
given in the same order
"""
if not isinstance(hashers, (tuple, list, set, frozenset)):
hashers = (hashers, )
buf = afile.read(blocksize)
while len(buf) > 0:
for h in hashers: h.update(buf)
buf = afile.read(blocksize)
return [ h.hexdigest() for h in hashers ]
@staticmethod
def compute_checksums(env, checksum_type, file_type, file_path):
"""
@param file_type is not a file, then zeroes are returned
@param checksum_type selected hash algorithm to compute second
checksum
@return (md5sum, checksum)
both checksums are computed from file_path's content
first one is always md5, the second one depends on checksum_type
if file does not exists, (None, None) is returned
"""
hashers = [hashlib.md5()]
if checksum_type != SoftwareFileCheck.checksumtype_str2num["md5"]:
hashers.append(SoftwareFileCheck.checksumtype_num2hash(checksum_type)())
if file_type != SoftwareFileCheck.filetype_str2pywbem('file'):
rslts = ['0'*len(h.hexdigest()) for h in hashers]
else:
try:
with open(file_path, 'rb') as f:
rslts = SoftwareFileCheck.hashfile(f, hashers)
except (OSError, IOError) as e:
env.get_logger().log_error("could not open file \"%s\""
" for reading: %s" % (file_path, e))
return None, None
return (rslts[0], rslts[1] if len(rslts) > 1 else rslts[0]*2)
@staticmethod
def object_path2yumcheck(env, op):
"""
@return instance of yum.packages._RPMVerifyPackage
this object holds RPMInstalledPackage under its po attribute
"""
if not isinstance(op, pywbem.CIMInstanceName):
raise TypeError("op must be instance of CIMInstanceName, "
"not \"%s\"" % op.__class__.__name__)
log = env.get_logger()
tos = get_target_operating_system()[0]
if ( not op['Name'] or not op['SoftwareElementID']
or not op['CheckID']
or not op['CheckID'].endswith('#'+op['Name'])
or op['SoftwareElementID'].find(op['Version']) == -1):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong keys.")
if op['SoftwareElementState'] not in ("2", 2):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Only \"Executable\" software element state supported")
if not check_target_operating_system(op['TargetOperatingSystem']):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Wrong target operating system.")
if not op['Name'] or not op['Version']:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
'Both "Name" and "Version" must be given')
m = re_nevra.match(op['SoftwareElementID'])
if not m:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Wrong SotwareElementID. Expected valid nevra"
" (name-epoch:version-release.arch).")
if op['Version'] != m.group('ver'):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Version does not match version part in SoftwareElementID.")
evra = "{}:{}-{}.{}".format(*(
(m.group(k) if k != "epoch" or m.group(k) else "0")
for k in ("epoch", 'ver', 'rel', 'arch')))
with YumDB.getInstance(env) as yb:
pl = yb.doPackageLists('installed')
exact, matched, unmatched = yum.packages.parsePackages(
pl.installed, [m.group('name')])
exact = yum.misc.unique(exact)
for pkg in exact:
if pkg.evra == evra: break
else:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"No matching package installed.")
fi = pkg.hdr.fiFromHeader()
vpkg = yum.packages._RPMVerifyPackage(pkg, fi,
SoftwareFileCheck.pkg_checksum_type(pkg), [], True)
if not op['Name'] in vpkg:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"File not found in RPM package.")
return vpkg
"""
Named tuple to store results of rpm file check as pywbem values,
all results are in form:
(expected, reality)
where
expected is value from rpm package
reality is value obtained from installed file
None means, that value could not be obtained
except for "exists" and "md5_checksum" attributes, where "exists"
is boolean and md5_checksum is a string
for example:
file_check.file_type == (4, 3)
"""
FileCheck = collections.namedtuple('FileCheck',
'exists, md5_checksum, file_type, file_size, file_mode, '
'file_checksum, device, link_target, user_id, group_id, '
'last_modification_time')
@staticmethod
def test_file(env, checksum_type, vpf):
"""
@param checksum type is a pywbem value for ChecksumType property
@return instance of FileCheck
"""
if not isinstance(vpf, yum.packages._RPMVerifyPackageFile):
raise TypeError("vpf must be an instance of _RPMVerifyPackage,"
" not \"%s\"" % vpf.__class__.__name__)
exists = os.path.lexists(vpf.filename)
md5_checksum = None
expected = {
"file_type" : SoftwareFileCheck.filetype_str2pywbem(vpf.ftype),
"user_id" : pywbem.Uint32(pwd.getpwnam(vpf.user).pw_uid),
"group_id" : pywbem.Uint32(grp.getgrnam(vpf.group).gr_gid),
"file_mode" : pywbem.Uint32(vpf.mode),
"file_size" : pywbem.Uint64(vpf.size),
"link_target" : vpf.readlink if vpf.readlink else None,
"file_checksum" : vpf.digest[1],
"device" : (pywbem.Uint64(vpf.dev)
if vpf.ftype.endswith('device') else None),
"last_modification_time" : pywbem.Uint64(vpf.mtime)
}
if not exists:
reality = collections.defaultdict(lambda: None)
else:
fstat = os.lstat(vpf.filename)
reality = {
"file_type" : SoftwareFileCheck.filetype_mode2pywbem(fstat.st_mode),
"user_id" : pywbem.Uint32(fstat.st_uid),
"group_id" : pywbem.Uint32(fstat.st_gid),
"file_mode" : pywbem.Uint32(fstat.st_mode),
"file_size" : pywbem.Uint64(fstat.st_size),
"last_modification_time" : pywbem.Uint64(fstat.st_mtime)
}
reality["device"] = (pywbem.Uint64(fstat.st_dev)
if reality['file_type'] == SoftwareFileCheck.filetype_str2pywbem(
"device") else None)
reality["link_target"] = (os.readlink(vpf.filename)
if os.path.islink(vpf.filename) else None)
md5_checksum, checksum = SoftwareFileCheck.compute_checksums(
env, checksum_type, reality["file_type"], vpf.filename)
reality["file_checksum"] = checksum
kwargs = dict(exists=exists, md5_checksum=md5_checksum,
**dict((k, (expected[k], reality[k])) for k in expected))
return SoftwareFileCheck.FileCheck(**kwargs)
@staticmethod
def filecheck_passed(fc):
if not isinstance(fc, SoftwareFileCheck.FileCheck):
raise TypeError("fc must be an instance of FileCheck")
return ( fc.exists
and all( v[0] == v[1]
for k, v in fc._asdict().items() if (
isinstance(v, tuple)
and ( k != "last_modification_time"
or fc.file_type[0] == \
SoftwareFileCheck.filetype_str2pywbem("file")
))))
@staticmethod
def filecheck_wrapper(namespace, classname):
def _filecheck2model_flags(fc):
flags = []
for k, v in fc._asdict().items():
if isinstance(v, tuple):
if ( k != "last_modification_time"
or fc.file_type[0] == SoftwareFileCheck.filetype_str2pywbem(
'file')):
# last_modification_time check is valid only for
# regular files
flag = fc.exists and v[0] == v[1]
else:
flag = True
flags.append(flag)
elif isinstance(v, bool):
flags.append(v)
return flags
def filecheck2model(vpkg, fn, env, keys_only=True,
model=None, fc=None):
if not isinstance(vpkg, yum.packages._RPMVerifyPackage):
raise TypeError(
"vpkg must be an instance of _RPMVerifyPackage")
if not fn in vpkg:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"File \"%s\" not found among package files" % fn)
if model is None:
model = pywbem.CIMInstanceName(classname, namespace=namespace)
if not keys_only:
model = pywbem.CIMInstance(classname, path=model)
if fc is not None:
if not isinstance(fc, SoftwareFileCheck.FileCheck):
raise TypeError("fc must be an instance of FileCheck")
pkg = vpkg.po
vpf = vpkg._files[fn]
model['Name'] = vpf.filename
model['SoftwareElementID'] = pkg.nevra
model['SoftwareElementState'] = pywbem.Uint16(2)
model['TargetOperatingSystem'] = pywbem.Uint16(
get_target_operating_system()[0])
model['Version'] = pkg.version
model['CheckID'] = '%s#%s' % (pkg.name, vpf.filename)
if not keys_only:
#model['Caption'] = '' # TODO
#model['CheckMode'] = bool() # TODO
#model['CRC1'] = pywbem.Uint32() # TODO
#model['CRC2'] = pywbem.Uint32() # TODO
#model['CreateTimeStamp'] = pywbem.CIMDateTime() # TODO
#model['Description'] = '' # TODO
#model['ElementName'] = '' # TODO
#model['InstanceID'] = '' # TODO
model['FileName'] = os.path.basename(vpf.filename)
model['FileChecksumType'] = csumt = \
pywbem.Uint16(SoftwareFileCheck.pkg_checksum_type(pkg))
if fc is None:
fc = SoftwareFileCheck.test_file(env, csumt, vpf)
for mattr, fattr in (
('FileType', 'file_type'),
('FileUserID', 'user_id'),
('FileGroupID', 'group_id'),
('FileMode', 'file_mode'),
('LastModificationTime', 'last_modification_time'),
('FileSize', 'file_size'),
('LinkTarget', 'link_target'),
('FileChecksum', 'file_checksum')):
exp, rea = getattr(fc, fattr)
if exp is not None:
model['Expected' + mattr] = exp
if rea is not None:
model[mattr] = rea
model['ExpectedFileModeFlags'] = \
SoftwareFileCheck.mode2pywbem_flags(fc.file_mode[0])
if fc.exists:
model['FileModeFlags'] = SoftwareFileCheck.mode2pywbem_flags(
fc.file_mode[1])
model['FileExists'] = fc.exists
if fc.md5_checksum is not None:
model['MD5Checksum'] = fc.md5_checksum
model['PassedFlags'] = _filecheck2model_flags(fc)
model['PassedFlagsDescriptions'] = list(
SoftwareFileCheck.passed_flags_descriptions)
return model
return filecheck2model