diff options
Diffstat (limited to 'src/software/openlmi/software/util/common.py')
-rw-r--r-- | src/software/openlmi/software/util/common.py | 825 |
1 files changed, 0 insertions, 825 deletions
diff --git a/src/software/openlmi/software/util/common.py b/src/software/openlmi/software/util/common.py deleted file mode 100644 index 685171a..0000000 --- a/src/software/openlmi/software/util/common.py +++ /dev/null @@ -1,825 +0,0 @@ -# -*- encoding: utf-8 -*- -# Software Management Providers -# -# Copyright (C) 2012 Red Hat, Inc. All rights reserved. -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA -# -# Authors: Michal Minar <miminar@redhat.com> -# - -"""Common utilities for LMI_Software* providers -""" - -import collections -from datetime import datetime -import grp -import hashlib -import itertools -import os -import platform -import pwd -import re -import rpm -import socket -import stat -import pywbem -import yum -import cmpi_pywbem_bindings as pycimmb -from openlmi.software.util import singletonmixin - -RE_EVRA = re.compile(r'^(?P<epoch>\d+):(?P<ver>[^-]+)' - r'-(?P<rel>.+)\.(?P<arch>[^.]+)$') -RE_NEVRA = re.compile(r'^(?P<name>.+)-(?P<evra>((?P<epoch>\d+):)?(?P<ver>[^-]+)' - r'-(?P<rel>.+)\.(?P<arch>[^.]+))$') - -RPMDB_PATH = '/var/lib/rpm/Packages' - -class YumDB(singletonmixin.Singleton): - """ - Context manager for accessing yum/rpm database. - """ - - # this is to inform Singleton, that __init__ should be called only once - ignoreSubsequent = True - - def __init__(self, env, *args, **kwargs): #pylint: disable=W0231 - if not isinstance(env, pycimmb.ProviderEnvironment): - raise TypeError("env must be instance of" - " pycimmb.ProviderEnvironment") - self._yum_args = (args, kwargs) - self._yum = None - self._db_mtime = 0 - self._lock_cnt = 0 - self.env = env - env.get_logger().log_info('init called') - - def is_dirty(self): - """ - @return True if rpm database has been modified since last update - """ - return self._db_mtime < os.stat(RPMDB_PATH).st_mtime - - def is_locked(self): - """ - @return True if rpm database is locked - """ - return self._yum._lockfile is not None - - def update_db(self): - """ - Call to update database metadata. - """ - self.env.get_logger().log_info('updating rpmdb') - self._db_mtime = os.stat(RPMDB_PATH).st_mtime - self._yum.doConfigSetup() - self._yum.doTsSetup() - self._yum.doRpmDBSetup() - - def __enter__(self): - self._lock_cnt += 1 - if self._lock_cnt < 2: - self._yum = yum.YumBase(*self._yum_args[0], **self._yum_args[1]) - if not self.is_locked() and self.is_dirty(): - self.update_db() - self._yum.doLock() - return self - - def __exit__(self, exc_type, exc_value, traceback): - if self._lock_cnt == 1: - del self._yum - self._lock_cnt -= 1 - - def __getattr__(self, name): - if not self.is_locked() and self.is_dirty(): - self.update_db() - return getattr(self._yum, name) - - -def _get_distname(): - """ - @return name of linux distribution - """ - if hasattr(platform, 'linux_distribution'): - return platform.linux_distribution( - full_distribution_name=False)[0].lower() - else: - return platform.dist()[0].lower() - - -def get_target_operating_system(): - """ - @return (val, text). - Where val is a number from ValueMap of TargetOperatingSystem property - of CIM_SoftwareElement class and text is its testual representation. - """ - - system = platform.system() - if system.lower() == 'linux': - try: - val, dist = \ - { 'redhat' : (79, 'RedHat Enterprise Linux') - , 'suse' : (81, 'SUSE') - , 'mandriva' : (88, 'Mandriva') - , 'ubuntu' : (93, 'Ubuntu') - , 'debian' : (95, 'Debian') - }[_get_distname()] - except KeyError: - linrel = platform.uname()[2] - if linrel.startswith('2.4'): - val, dist = (97, 'Linux 2.4.x') - elif linrel.startswith('2.6'): - val, dist = (99, 'Linux 2.6.x') - else: - return (36, 'LINUX') # no check for x86_64 - if platform.machine() == 'x86_64': - val += 1 - dist += ' 64-Bit' - return (val, dist) - elif system.lower() in ('macosx', 'darwin'): - return (2, 'MACOS') - # elif system.lower() == 'windows': # no general value - else: - return (0, 'Unknown') - -def get_computer_system_op(prefix='Linux'): - """ - @return object path of CIM_ComputerSystem for this system - """ - return pywbem.CIMInstanceName( - classname='%s_ComputerSystem' % prefix, - keybindings={ - "CreationClassName": "%s_ComputerSystem" % prefix - , "Name" : socket.gethostname() }, - namespace="root/cimv2") - -def check_target_operating_system(system): - """ - @return if param system matches current target operating system - """ - if isinstance(system, basestring): - system = int(system) - if not isinstance(system, (int, long)): - raise TypeError("system must be either string or integer, not {}" - .format(system.__class__.__name__)) - tos = get_target_operating_system() - if system == tos: - return True - if system == 36: # linux - if platform.system().lower() == "linux": - return True - if ( system >= 97 and system <= 100 # linux 2.x.x - and platform.uname()[2].startswith('2.4' if system < 99 else '2.6') - # check machine - and ( bool(platform.machine().endswith('64')) - == bool(not (system % 2)))): - return True - return False - -def check_computer_system_op(env, system): - """ - @param system is object path referring to CIM_ComputerSystem instance - passed as argument to some cim function - @return True if this instance matches our system; otherwise a CIMError - will be raised - """ - if not isinstance(system, pywbem.CIMInstanceName): - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "\"System\" must be a CIMInstanceName") - our_system = get_computer_system_op('CIM') - chandle = env.get_cimom_handle() - if not chandle.is_subclass(system.namespace, - sub=system.classname, - super=our_system.classname): - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "Class of \"System\" must be a sublass of %s" % - our_system.classname) - if not 'CreationClassName' in system or not 'Name' in system: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "\"System\" is missing one of keys") - if not chandle.is_subclass(system.namespace, - sub=system['CreationClassName'], - super=our_system.classname): - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "CreationClassName of \"System\" must be a sublass of %s" % - our_system.classname) - if system['Name'] != our_system['Name']: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "Name of \"System\" does not match \"%s\"" % - our_system['Name']) - return True - - -def match_pkg(pkg, **kwargs): - """ - all not Null and not empty arguments will be matched against pkg - attributes; if all of them match, return True - - possible arguments: - name, epoch, version, release, arch - evra, nevra - """ - for attr in ( 'evra', 'nevra', 'name', 'epoch' - , 'version', 'release', 'arch'): - value = kwargs.get(attr, None) - if value and getattr(pkg, attr) != value: - return False - return True - -class SoftwarePackage: - """ - Just a namespace for common function related to SoftwarePackage provider. - TODO: make it a submodule - """ - - @staticmethod - def object_path2pkg(env, objpath, package_list='installed'): - """ - @param objpath must contain precise information of package, - otherwise a CIM_ERR_NOT_FOUND error is raised - @param package_list one of {'installed', 'all', 'available'} - says, where to look for given package - """ - if not isinstance(objpath, pywbem.CIMInstanceName): - raise TypeError("objpath must be an instance of CIMInstanceName") - if not isinstance(package_list, basestring): - raise TypeError("package_list must be a string") - if not package_list in ('installed', 'all', 'available'): - raise ValueError('unsupported package list "%s"'%package_list) - - if ( not objpath['Name'] or not objpath['SoftwareElementID'] - or not objpath['SoftwareElementID'].startswith(objpath['Name']) - or objpath['SoftwareElementID'].find(objpath['Version']) == -1): - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong keys.") - if not check_target_operating_system(objpath['TargetOperatingSystem']): - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "Wrong target operating system.") - if not objpath['Name'] or not objpath['Version']: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - 'Both "Name" and "Version" must be given') - match = RE_NEVRA.match(objpath['SoftwareElementID']) - if not match: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "Wrong SotwareElementID. Expected valid nevra" - " (name-[epoch:]version-release.arch).") - if objpath['Version'] != match.group('ver'): - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "Version does not match version part in SoftwareElementID.") - evra = "{}:{}-{}.{}".format(*( - (match.group(k) if k != "epoch" or match.group(k) else "0") - for k in ("epoch", 'ver', 'rel', 'arch'))) - with YumDB.getInstance(env) as ydb: - pkglist = ydb.doPackageLists(package_list, - showdups=package_list != 'installed') - if package_list != 'all': - pkglist = getattr(pkglist, package_list) - else: - # NOTE: available ∩ installed = ∅ - pkglist = itertools.chain(pkglist.available, pkglist.installed) - exact, _, _ = yum.packages.parsePackages(pkglist, [objpath['Name']]) - for pkg in yum.misc.unique(exact): - if pkg.evra == evra: - return pkg - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "No matching package found.") - - @staticmethod - def object_path2pkg_search(env, objpath): - """ - similar to object_path2pkg, but tries to find best suitable - package matching keys - - If any matching package is already installed, it is returned. - Otherwise available package with highest version is returned. - - @param objpath may be object of CIMInstance or CIMInstanceName and - must contain at least \"Name\" or \"SoftwareElementID\" - @return instance of yum.rpmsack.RPMInstalledPackage in case of - installed package, otherwise yum.packages.YumAvailablePackage - """ - logger = env.get_logger() - if isinstance(objpath, pywbem.CIMInstance): - def _get_key(k): - """@return value of instance's key""" - value = objpath.properties.get(k, None) - if isinstance(value, pywbem.CIMProperty): - return value.value - if value is not None: - return value - logger.log_error('missing key "{}" in inst.props'.format(k)) - return objpath.path[k] if k in objpath.path else None - elif isinstance(objpath, pywbem.CIMInstanceName): - _get_key = lambda k: objpath[k] if k in objpath else None - else: - raise TypeError("objpath must be either CIMInstance" - "or CIMInstanceName") - - # parse and check arguments - match_props = {} # args for match_pkg - if _get_key('SoftwareElementID'): - match = RE_NEVRA.match(_get_key('SoftwareElementID')) - if not match: - raise pywbem.CIMError(pywbem.CIM_ERR_INVALID_PARAMETER, - "SoftwareElementID could not be parsed.") - for k in ('name', 'version', 'release', 'arch'): - mkey = k if k not in ('version', 'release') else k[:3] - match_props[k] = match.group(mkey) - if not match.group("epoch"): - match_props["epoch"] = "0" - else: - for k in ('name', 'epoch', 'version', 'release', 'arch'): - ikey = k if k != 'arch' else "architecture" - if _get_key(ikey): - match_props[k] = _get_key(ikey) - - if not match_props: - raise pywbem.CIMError(pywbem.CIM_ERR_FAILED, - "Too few key values given (give at least a Name).") - if not 'name' in match_props: - raise pywbem.CIMError(pywbem.CIM_ERR_FAILED, - "Missing either Name or SoftwareElementID property.") - - # get available packages - pkglist = YumDB.getInstance(env).doPackageLists('all', showdups=True) - # NOTE: available ∩ installed = ∅ - exact, _, _ = yum.packages.parsePackages( - itertools.chain(pkglist.available, pkglist.installed), - [match_props['name']]) - exact = yum.misc.unique(exact) - exact_orig = exact - exact = sorted([ p for p in exact if match_pkg(p, **match_props) ]) - if len(exact) == 0: - logger.log_error('could not find any package for query: {}' - ' in list: {}' - .format(match_props, [p.nevra for p in exact_orig])) - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "No matching package found.") - for pkg in exact: # check, whether package is already installed - if isinstance(pkg, yum.rpmsack.RPMInstalledPackage): - return pkg - logger.log_info(('found multiple matching packages' - ' for query: {}' if len(exact) > 1 else - 'exact match found for query: {}') - .format(match_props)) - return exact[-1] # select highest version - - @staticmethod - def pkg2model(env, pkg, keys_only=True, model=None): - """ - @param model if None, will be filled with data, otherwise - a new instance of CIMInstance or CIMObjectPath is created - """ - #if not isinstance(pkg, yum.rpmsack.RPMInstalledPackage): - #if not isinstance(pkg, yum.packages.YumHeaderPackage): - if not isinstance(pkg, yum.packages.YumAvailablePackage): - raise TypeError( - "pkg must be an instance of YumAvailablePackage") - if model is None: - model = pywbem.CIMInstanceName('LMI_SoftwarePackage', - namespace='root/cimv2') - if not keys_only: - model = pywbem.CIMInstance('LMI_SoftwarePackage', path=model) - if isinstance(model, pywbem.CIMInstance): - def _set_key(k, value): - """Sets the value of key property of cim instance""" - model[k] = value - model.path[k] = value #pylint: disable=E1103 - else: - _set_key = model.__setitem__ - with YumDB.getInstance(env): - _set_key('Name', pkg.name) - _set_key('SoftwareElementID', pkg.nevra) - _set_key('SoftwareElementState', pywbem.Uint16(2 - if isinstance(pkg, yum.rpmsack.RPMInstalledPackage) - else 1)) - _set_key('TargetOperatingSystem', - pywbem.Uint16(get_target_operating_system()[0])) - _set_key('Version', pkg.version) - if not keys_only: - model['Caption'] = pkg.summary - model['Description'] = pkg.description - if isinstance(pkg, yum.rpmsack.RPMInstalledPackage): - model['InstallDate'] = pywbem.CIMDateTime( - datetime.fromtimestamp(pkg.installtime)) - if pkg.vendor: - model['Manufacturer'] = pkg.vendor - model['Release'] = pkg.release - model['Epoch'] = pywbem.Uint16(pkg.epoch) - model["Architecture"] = pkg.arch - model['License'] = pkg.license - model['Group'] = pkg.group - model['Size'] = pywbem.Uint64(pkg.size) - return model - -class SoftwareFileCheck: - """ - Just a namespace for functions related to FileCheck provider. - TODO: make it a submodule - """ - - passed_flags_descriptions = ( - "Existence", - "File Type", - "File Size", - "File Mode", - "File Checksum", - "Device major/minor number", - "Symlink Target", - "User Ownership", "Group Ownership", - "Modify Time") - - checksumtype_str2num = dict((val, k) for (k, val) in - yum.constants.RPM_CHECKSUM_TYPES.items()) - - @staticmethod - def pkg_checksum_type(pkg): - """ - @return integer representation of checksum type - """ - if not isinstance(pkg, yum.packages.YumAvailablePackage): - raise TypeError("pkg must be an instance of YumAvailablePackage") - if isinstance(pkg, yum.rpmsack.RPMInstalledPackage): - return pkg.hdr[rpm.RPMTAG_FILEDIGESTALGO] - with YumDB.getInstance(): # ensure, that _yum is inited - return SoftwareFileCheck.checksumtype_str2pywbem( - pkg.yumdb_info.checksum_type) - - @staticmethod - def checksumtype_num2hash(csumt): - """ - @param csumt checksum type as a number obtained from package - @return hash function object corresponding to csumt - """ - return getattr(hashlib, yum.constants.RPM_CHECKSUM_TYPES[csumt]) - - @staticmethod - def checksumtype_str2pywbem(alg): - """ - @param alg is a name of algorithm used for checksum - @return pywbem number corresponding to given alg - """ - try: - res = SoftwareFileCheck.checksumtype_str2num[alg.lower()] - except KeyError: - res = 0 - return pywbem.Uint16(res) - - @staticmethod - def filetype_str2pywbem(file_type): - """ - @param file_type is a name of file type obtained from pkg headers - @return pywbem number corresponding to thus file type - """ - try: - return pywbem.Uint16( - { 'file' : 1 - , 'directory' : 2 - , 'symlink' : 3 - , 'fifo' : 4 - , 'character device' : 5 - , 'block device' : 6 - }[file_type]) - except KeyError: - return pywbem.Uint16(0) - - @staticmethod - def filetype_mode2pywbem(mode): - """ - @param mode is a raw file mode as integer - @return pywbem numeric value of file's type - """ - for i, name in enumerate( - ('REG', 'DIR', 'LNK', 'FIFO', 'CHR', 'BLK'), 1): - if getattr(stat, 'S_IS' + name)(mode): - return pywbem.Uint16(i) - return pywbem.Uint16(0) - - @staticmethod - def mode2pywbem_flags(mode): - """ - @param mode if None, file does not exist - @return list of integer flags describing file's access permissions - """ - if mode is None: - return None - flags = [] - for i, flag in enumerate(( - stat.S_IXOTH, - stat.S_IWOTH, - stat.S_IROTH, - stat.S_IXGRP, - stat.S_IWGRP, - stat.S_IRGRP, - stat.S_IXUSR, - stat.S_IWUSR, - stat.S_IRUSR, - stat.S_ISVTX, - stat.S_ISGID, - stat.S_ISUID)): - if flag & mode: - flags.append(pywbem.Uint8(i)) - return flags - - @staticmethod - def hashfile(afile, hashers, blocksize=65536): - """ - @param hashers is a list of hash objects - @return list of digest strings (in hex format) for each hash object - given in the same order - """ - if not isinstance(hashers, (tuple, list, set, frozenset)): - hashers = (hashers, ) - buf = afile.read(blocksize) - while len(buf) > 0: - for hashfunc in hashers: - hashfunc.update(buf) - buf = afile.read(blocksize) - return [ hashfunc.hexdigest() for hashfunc in hashers ] - - @staticmethod - def compute_checksums(env, checksum_type, file_type, file_path): - """ - @param file_type is not a file, then zeroes are returned - @param checksum_type selected hash algorithm to compute second - checksum - @return (md5sum, checksum) - both checksums are computed from file_path's content - first one is always md5, the second one depends on checksum_type - if file does not exists, (None, None) is returned - """ - hashers = [hashlib.md5()] #pylint: disable=E1101 - if checksum_type != SoftwareFileCheck.checksumtype_str2num["md5"]: - hashers.append(SoftwareFileCheck.checksumtype_num2hash( - checksum_type)()) - if file_type != SoftwareFileCheck.filetype_str2pywbem('file'): - rslts = ['0'*len(h.hexdigest()) for h in hashers] - else: - try: - with open(file_path, 'rb') as fobj: - rslts = SoftwareFileCheck.hashfile(fobj, hashers) - except (OSError, IOError) as exc: - env.get_logger().log_error("could not open file \"%s\"" - " for reading: %s" % (file_path, exc)) - return None, None - return (rslts[0], rslts[1] if len(rslts) > 1 else rslts[0]*2) - - @staticmethod - def object_path2yumcheck(env, objpath): - """ - @return instance of yum.packages._RPMVerifyPackage - this object holds RPMInstalledPackage under its po attribute - """ - if not isinstance(objpath, pywbem.CIMInstanceName): - raise TypeError("objpath must be instance of CIMInstanceName, " - "not \"%s\"" % objpath.__class__.__name__) - - if ( not objpath['Name'] or not objpath['SoftwareElementID'] - or not objpath['CheckID'] - or not objpath['CheckID'].endswith('#'+objpath['Name']) - or objpath['SoftwareElementID'].find(objpath['Version']) == -1): - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong keys.") - if objpath['SoftwareElementState'] not in ("2", 2): - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "Only \"Executable\" software element state supported") - if not check_target_operating_system(objpath['TargetOperatingSystem']): - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "Wrong target operating system.") - if not objpath['Name'] or not objpath['Version']: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - 'Both "Name" and "Version" must be given') - match = RE_NEVRA.match(objpath['SoftwareElementID']) - if not match: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "Wrong SotwareElementID. Expected valid nevra" - " (name-epoch:version-release.arch).") - if objpath['Version'] != match.group('ver'): - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "Version does not match version part in SoftwareElementID.") - - evra = "{}:{}-{}.{}".format(*( - (match.group(k) if k != "epoch" or match.group(k) else "0") - for k in ("epoch", 'ver', 'rel', 'arch'))) - with YumDB.getInstance(env) as ydb: - pkglist = ydb.doPackageLists('installed') - exact, _, _ = yum.packages.parsePackages( - pkglist.installed, [match.group('name')]) - for pkg in yum.misc.unique(exact): - if pkg.evra != evra: - continue - vpkg = yum.packages._RPMVerifyPackage( - pkg, pkg.hdr.fiFromHeader(), - SoftwareFileCheck.pkg_checksum_type(pkg), [], True) - if not objpath['Name'] in vpkg: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "File not found in RPM package.") - return vpkg - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "No matching package installed.") - - # Named tuple to store results of rpm file check as pywbem values, - # all results are in form: - # (expected, reality) - # where - # expected is value from rpm package - # reality is value obtained from installed file - # None means, that value could not be obtained - # except for "exists" and "md5_checksum" attributes, where "exists" - # is boolean and md5_checksum is a string - # for example: - # file_check.file_type == (4, 3) - FileCheck = collections.namedtuple('FileCheck', - 'exists, md5_checksum, file_type, file_size, file_mode, ' - 'file_checksum, device, link_target, user_id, group_id, ' - 'last_modification_time') - - @staticmethod - def test_file(env, checksum_type, vpf): - """ - @param checksum type is a pywbem value for ChecksumType property - @return instance of FileCheck - """ - if not isinstance(vpf, yum.packages._RPMVerifyPackageFile): - raise TypeError("vpf must be an instance of _RPMVerifyPackage," - " not \"%s\"" % vpf.__class__.__name__) - exists = os.path.lexists(vpf.filename) - md5_checksum = None - expected = { - "file_type" : SoftwareFileCheck.filetype_str2pywbem(vpf.ftype), - "user_id" : pywbem.Uint32(pwd.getpwnam(vpf.user).pw_uid), - "group_id" : pywbem.Uint32(grp.getgrnam(vpf.group).gr_gid), - "file_mode" : pywbem.Uint32(vpf.mode), - "file_size" : pywbem.Uint64(vpf.size), - "link_target" : vpf.readlink if vpf.readlink else None, - "file_checksum" : vpf.digest[1], - "device" : (pywbem.Uint64(vpf.dev) - if vpf.ftype.endswith('device') else None), - "last_modification_time" : pywbem.Uint64(vpf.mtime) - } - if not exists: - reality = collections.defaultdict(lambda: None) - else: - fstat = os.lstat(vpf.filename) - reality = { - "file_type" : SoftwareFileCheck.filetype_mode2pywbem( - fstat.st_mode), - "user_id" : pywbem.Uint32(fstat.st_uid), - "group_id" : pywbem.Uint32(fstat.st_gid), - "file_mode" : pywbem.Uint32(fstat.st_mode), - "file_size" : pywbem.Uint64(fstat.st_size), - "last_modification_time" : pywbem.Uint64(fstat.st_mtime) - } - reality["device"] = (pywbem.Uint64(fstat.st_dev) - if reality['file_type'] == - SoftwareFileCheck.filetype_str2pywbem("device") else None) - reality["link_target"] = (os.readlink(vpf.filename) - if os.path.islink(vpf.filename) else None) - md5_checksum, checksum = SoftwareFileCheck.compute_checksums( - env, checksum_type, reality["file_type"], vpf.filename) - reality["file_checksum"] = checksum - kwargs = dict(exists=exists, md5_checksum=md5_checksum, - **dict((k, (expected[k], reality[k])) for k in expected)) - return SoftwareFileCheck.FileCheck(**kwargs) - - @staticmethod - def filecheck_passed(file_check): - """ - @return True if installed file passed all checks. - """ - if not isinstance(file_check, SoftwareFileCheck.FileCheck): - raise TypeError("file_check must be an instance of FileCheck") - passed = file_check.exists - for k, val in file_check._asdict().items(): - if not passed: - break - if not isinstance(val, tuple): - continue - if ( k == "last_modification_time" - and file_check.file_type[0] != \ - SoftwareFileCheck.filetype_str2pywbem("file")): - continue - if ( k == "file_mode" - and file_check.file_type[0] == \ - SoftwareFileCheck.filetype_str2pywbem("symlink")): - continue - passed = val[0] == val[1] - return passed - - @staticmethod - def _filecheck2model_flags(file_check): - """ - @param file_check is an instance of FileCheck - @return pywbem value for PassedFlags property - """ - flags = [] - for k, value in file_check._asdict().items(): #pylint: disable=W0212 - if isinstance(value, tuple): - if ( k == "last_modification_time" - and file_check.file_type[0] != \ - SoftwareFileCheck.filetype_str2pywbem('file')): - # last_modification_time check is valid only for - # regular files - flag = file_check.exists - elif ( k == "file_mode" - and file_check.file_type[0] == \ - SoftwareFileCheck.filetype_str2pywbem('symlink')): - # do not check mode of symlinks - flag = ( file_check.exists - and ( file_check.file_type[0] - == file_check.file_type[1])) - else: - flag = file_check.exists and value[0] == value[1] - flags.append(flag) - elif isinstance(value, bool): - flags.append(value) - return flags - - @staticmethod - def _fill_non_key_values(env, model, pkg, vpf, file_check): - """ - Fills a non key values into instance of SoftwareFileCheck. - """ - model['FileName'] = os.path.basename(vpf.filename) - model['FileChecksumType'] = csumt = \ - pywbem.Uint16(SoftwareFileCheck.pkg_checksum_type(pkg)) - if file_check is None: - file_check = SoftwareFileCheck.test_file(env, csumt, vpf) - for mattr, fattr in ( - ('FileType', 'file_type'), - ('FileUserID', 'user_id'), - ('FileGroupID', 'group_id'), - ('FileMode', 'file_mode'), - ('LastModificationTime', 'last_modification_time'), - ('FileSize', 'file_size'), - ('LinkTarget', 'link_target'), - ('FileChecksum', 'file_checksum')): - exp, rea = getattr(file_check, fattr) - if exp is not None: - model['Expected' + mattr] = exp - if rea is not None: - model[mattr] = rea - model['ExpectedFileModeFlags'] = \ - SoftwareFileCheck.mode2pywbem_flags(file_check.file_mode[0]) - if file_check.exists: - model['FileModeFlags'] = \ - SoftwareFileCheck.mode2pywbem_flags(file_check.file_mode[1]) - model['FileExists'] = file_check.exists - if file_check.md5_checksum is not None: - model['MD5Checksum'] = file_check.md5_checksum - model['PassedFlags'] = SoftwareFileCheck._filecheck2model_flags( - file_check) - model['PassedFlagsDescriptions'] = list( - SoftwareFileCheck.passed_flags_descriptions) - - @staticmethod - def filecheck2model(vpkg, file_name, env, keys_only=True, - model=None, file_check=None): - """ - @param vpkg is an instance of yum.packages_RPMVerifyPackage - @param file_name a absolute file path contained in package - @param keys_only if True, then only key values will be filed - @param model if given, then this instance will be modified and - returned - @param file_check if not given, it will be computed - @return instance of LMI_SoftwareFileCheck class with all desired - values filed - """ - if not isinstance(vpkg, yum.packages._RPMVerifyPackage): - raise TypeError( - "vpkg must be an instance of _RPMVerifyPackage") - if not file_name in vpkg: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "File \"%s\" not found among package files" % file_name) - if model is None: - model = pywbem.CIMInstanceName("LMI_SoftwareFileCheck", - namespace="root/cimv2") - if not keys_only: - model = pywbem.CIMInstance("LMI_SoftwareFileCheck", path=model) - if file_check is not None: - if not isinstance(file_check, SoftwareFileCheck.FileCheck): - raise TypeError("file_check must be an instance of FileCheck") - pkg = vpkg.po - vpf = vpkg._files[file_name] - model['Name'] = vpf.filename - model['SoftwareElementID'] = pkg.nevra - model['SoftwareElementState'] = pywbem.Uint16(2) - model['TargetOperatingSystem'] = pywbem.Uint16( - get_target_operating_system()[0]) - model['Version'] = pkg.version - model['CheckID'] = '%s#%s' % (pkg.name, vpf.filename) - if not keys_only: - SoftwareFileCheck._fill_non_key_values( - env, model, pkg, vpf, file_check) - return model - |