diff options
Diffstat (limited to 'src/software/providers/util/common.py')
-rw-r--r-- | src/software/providers/util/common.py | 804 |
1 files changed, 804 insertions, 0 deletions
diff --git a/src/software/providers/util/common.py b/src/software/providers/util/common.py new file mode 100644 index 0000000..4190d41 --- /dev/null +++ b/src/software/providers/util/common.py @@ -0,0 +1,804 @@ +# -*- encoding: utf-8 -*- +# Software Management Providers +# +# Copyright (C) 2012 Red Hat, Inc. All rights reserved. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +"""Common utilities for LMI_Software* providers +""" + +import collections +from datetime import datetime +import grp +import hashlib +import itertools +import os +import platform +import pwd +import re +import rpm +import socket +import stat +import pywbem +import yum +import cmpi_pywbem_bindings as pycimmb +import singletonmixin + +re_evra = re.compile(r'^(?P<epoch>\d+):(?P<ver>[^-]+)' + r'-(?P<rel>.+)\.(?P<arch>[^.]+)$') +re_nevra = re.compile(r'^(?P<name>.+)-(?P<evra>((?P<epoch>\d+):)?(?P<ver>[^-]+)' + r'-(?P<rel>.+)\.(?P<arch>[^.]+))$') + +rpmdb_path = '/var/lib/rpm/Packages' + +class YumDB(singletonmixin.Singleton): + + ignoreSubsequent = True + + def __init__(self, env, *args, **kwargs): + if not isinstance(env, pycimmb.ProviderEnvironment): + raise TypeError("env must be instance of" + " pycimmb.ProviderEnvironment") + self._yum_args = (args, kwargs) + self._yum = None + self._db_mtime = 0 + self._lock_cnt = 0 + self.env = env + env.get_logger().log_info('init called') + + def is_dirty(self): + return self._db_mtime < os.stat(rpmdb_path).st_mtime + + def is_locked(self): + return self._yum._lockfile is not None + + def update_db(self): + self.env.get_logger().log_info('updating rpmdb') + self._db_mtime = os.stat(rpmdb_path).st_mtime + self._yum.doConfigSetup() + self._yum.doTsSetup() + self._yum.doRpmDBSetup() + + def __enter__(self): + self._lock_cnt += 1 + if self._lock_cnt < 2: + self._yum = yum.YumBase(*self._yum_args[0], **self._yum_args[1]) + if not self.is_locked() and self.is_dirty(): + self.update_db() + self._yum.doLock() + return self + + def __exit__(self, exc_type, exc_value, traceback): + if self._lock_cnt == 1: + del self._yum + self._lock_cnt -= 1 + + def __getattr__(self, name): + if not self.is_locked() and self.is_dirty(): + self.update_db() + return getattr(self._yum, name) + + +def _get_distname(): + if hasattr(platform, 'linux_distribution'): + return platform.linux_distribution( + full_distribution_name=False)[0].lower() + else: + return platform.dist()[0].lower() + + +def get_target_operating_system(): + """ + @return (val, text). + Where val is a number from ValueMap of TargetOperatingSystem property + of CIM_SoftwareElement class and text is its testual representation. + """ + + system = platform.system() + if system.lower() == 'linux': + try: + val, dist = \ + { 'redhat' : (79, 'RedHat Enterprise Linux') + , 'suse' : (81, 'SUSE') + , 'mandriva' : (88, 'Mandriva') + , 'ubuntu' : (93, 'Ubuntu') + , 'debian' : (95, 'Debian') + }[_get_distname()] + except KeyError: + linrel = platform.uname()[2] + if linrel.startswith('2.4'): + val, dist = (97, 'Linux 2.4.x') + elif linrel.startswith('2.6'): + val, dist = (99, 'Linux 2.6.x') + else: + return (36, 'LINUX') # no check for x86_64 + if platform.machine() == 'x86_64': + val += 1 + dist += ' 64-Bit' + return (val, dist) + elif system.lower() in ('macosx', 'darwin'): + return (2, 'MACOS') + # elif system.lower() == 'windows': # no general value + else: + return (0, 'Unknown') + +def get_computer_system_op(): + return pywbem.CIMInstanceName( + classname='CIM_ComputerSystem', + keybindings={ + "CreationClassName": "CIM_ComputerSystem" + , "Name" : socket.gethostname() }, + namespace="root/cimv2") + +def check_target_operating_system(system): + """ + @return if param system matches current target operating system + """ + if isinstance(system, basestring): + system = int(system) + if not isinstance(system, (int, long)): + raise TypeError("system must be either string or integer, not {}" + .format(system.__class__.__name__)) + tos = get_target_operating_system() + if system == tos: return True + if system == 36: # linux + if platform.system().lower() == "linux": return True + if ( system >= 97 and system <= 100 # linux 2.x.x + and platform.uname()[2].startswith('2.4' if system < 99 else '2.6') + # check machine + and ( bool(platform.machine().endswith('64')) + == bool(not (system % 2)))): + return True + return False + +def check_computer_system_op(env, system): + if not isinstance(system, pywbem.CIMInstanceName): + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "\"System\" must be a CIMInstanceName") + our_system = get_computer_system_op() + ch = env.get_cimom_handle() + if not ch.is_subclass(system.namespace, + sub=system.classname, + super=our_system.classname): + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "Class of \"System\" must be a sublass of %s" % + our_system.classname) + if not 'CreationClassName' in system or not 'Name' in system: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "\"System\" is missing one of keys") + if not ch.is_subclass(system.namespace, + sub=system['CreationClassName'], + super=our_system.classname): + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "CreationClassName of \"System\" must be a sublass of %s" % + our_system.classname) + if system['Name'] != our_system['Name']: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "Name of \"System\" does not match \"%s\"" % + our_system['Name']) + return True + + +def match_pkg(pkg, **kwargs): + """ + all not Null and not empty arguments will be matched against pkg + attributes; if all of them match, return True + + possible arguments: + name, epoch, version, release, arch + evra, nevra + """ + for a in ('evra', 'nevra', 'name', 'epoch', 'version', 'release', 'arch'): + v = kwargs.get(a, None) + if v and getattr(pkg, a) != v: + return False + return True + +class SoftwarePackage: + """ + Just a namespace for common function related to SoftwarePackage provider. + """ + + @staticmethod + def parse_group(group, env): + try: + res = \ + { "amusements/games" : 3 + , "amusements/graphics" : 4 + , "applications/archiving" : 5 + , "applications/communications" : 6 + , "applications/databases" : 7 + , "applications/editors" : 8 + , "applications/emulators" : 9 + , "applications/engineering" : 10 + , "applications/file" : 11 + , "applications/internet" : 12 + , "applications/multimedia" : 13 + , "applications/productivity" : 14 + , "applications/publishing" : 15 + , "applications/system" : 16 + , "applications/text" : 17 + , "development/build tools" : 18 + , "development/debug" : 19 + , "development/debuggers" : 20 + , "development/documentation" : 21 + , "development/java" : 22 + , "development/languages" : 23 + , "development/libraries" : 24 + , "development/libraries/java" : 25 + , "development/system" : 26 + , "development/tools" : 27 + , "documentation" : 28 + , "internet/www/dynamic content" : 29 + , "system/libraries" : 30 + , "system environment/base" : 31 + , "system environment/daemons" : 32 + , "system environment/kernel" : 33 + , "system environment/libraries" : 34 + , "system environment/shells" : 35 + , "text processing/markup/xml" : 36 + , "user interface/desktops" : 37 + , "user interface/x" : 38 + , "user interface/x hardware support" : 39 + , "utilities" : 40 + }[group.lower()] + except KeyError: + logger = env.get_logger() + if not group or group.lower() == "unspecified": + logger.log_info("unspecified group '{}'".format(group)) + res = 2 + else: + logger.log_error("failed to parse group '{}'".format(group)) + res = 0 + return pywbem.Uint16(res) + + @staticmethod + def object_path2pkg(env, op, package_list='installed'): + """ + @param op must contain precise information of package, + otherwise a CIM_ERR_NOT_FOUND error is raised + @param package_list one of {'installed', 'all', 'available'} + says, where to look for given package + """ + if not isinstance(op, pywbem.CIMInstanceName): + raise TypeError("op must be an instance of CIMInstanceName") + if not isinstance(package_list, basestring): + raise TypeError("package_list must be a string") + if not package_list in ('installed', 'all', 'available'): + raise ValueError('unsupported package list "%s"'%package_list) + + tos = get_target_operating_system()[0] + if ( not op['Name'] or not op['SoftwareElementID'] + or not op['SoftwareElementID'].startswith(op['Name']) + or op['SoftwareElementID'].find(op['Version']) == -1): + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong keys.") +# if op['SoftwareElementState'] not in ("2", 2): +# raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, +# "Only \"Executable\" software element state supported") + if not check_target_operating_system(op['TargetOperatingSystem']): + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "Wrong target operating system.") + if not op['Name'] or not op['Version']: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + 'Both "Name" and "Version" must be given') + m = re_nevra.match(op['SoftwareElementID']) + if not m: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "Wrong SotwareElementID. Expected valid nevra" + " (name-[epoch:]version-release.arch).") + if op['Version'] != m.group('ver'): + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "Version does not match version part in SoftwareElementID.") + evra = "{}:{}-{}.{}".format(*( + (m.group(k) if k != "epoch" or m.group(k) else "0") + for k in ("epoch", 'ver', 'rel', 'arch'))) + with YumDB.getInstance(env) as yb: + pl = yb.doPackageLists(package_list, + showdups=package_list != 'installed') + if package_list != 'all': + pl = getattr(pl, package_list) + else: + # NOTE: available ∩ installed = ∅ + pl = itertools.chain(pl.available, pl.installed) + exact,_,_ = yum.packages.parsePackages(pl, [op['Name']]) + for pkg in yum.misc.unique(exact): + if pkg.evra == evra: break + else: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "No matching package found.") + return pkg + + @staticmethod + def object_path2pkg_search(env, op): + """ + similar to object_path2pkg, but tries to find best suitable + package matching keys + + If any matching package is already installed, it is returned. + Otherwise available package with highest version is returned. + + @param op may be object of CIMInstance or CIMInstanceName and + must contain at least \"Name\" or \"SoftwareElementID\" + @return instance of yum.rpmsack.RPMInstalledPackage in case of + installed package, otherwise yum.packages.YumAvailablePackage + """ + logger = env.get_logger() + if isinstance(op, pywbem.CIMInstance): + def _get_key(k): + v = op.properties.get(k, None) + if isinstance(v, pywbem.CIMProperty): return v.value + if v is not None: return v + logger.log_error('missing key "{}" in inst.props'.format(k)) + return op.path[k] if k in op.path else None + elif isinstance(op, pywbem.CIMInstanceName): + _get_key = lambda k: op[k] if k in op else None + else: + raise TypeError("op must be either CIMInstance or CIMInstanceName") + + # parse and check arguments + match_props = {} # args for match_pkg + if _get_key('SoftwareElementID'): + m = re_nevra.match(_get_key('SoftwareElementID')) + if not m: + raise pywbem.CIMError(pywbem.CIM_ERR_INVALID_PARAMETER, + "SoftwareElementID could not be parsed.") + match_props['nevra'] = _get_key('SoftwareElementID') + match_props['name'] = m.group('name') + else: + for matchattr, instattr in ( + ('name', 'name'), ('epoch', 'epoch'), ('version', 'ver'), + ('release', 'rel'), ('arch', 'arch')): + if _get_key(matchattr): + match_props[matchattr] = _get_key(matchattr) + + if not match_props: + raise pywbem.CIMError(pywbem.CIM_ERR_FAILED, + "Too few key values given (give at least a Name).") + if not 'name' in match_props and not 'nevra' in match_props: + raise pywbem.CIMError(pywbem.CIM_ERR_FAILED, + "Missing either Name or SoftwareElementID property.") + + # get available packages + pl = YumDB.getInstance(env).doPackageLists('all', showdups=True) + # NOTE: available ∩ installed = ∅ + exact,_,_ = yum.packages.parsePackages( + itertools.chain(pl.available, pl.installed), + [match_props['name']]) + exact = yum.misc.unique(exact) + exact_orig = exact + exact = sorted( [ p for p in exact if match_pkg(p, **match_props) ] + , key=lambda a: a.evra) + if len(exact) == 0: + logger.log_error('could not find any package for query: {}' + ' in list: {}' + .format(match_props, [p.nevra for p in exact_orig])) + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "No matching package found.") + for pkg in exact: # check, whether package is already installed + if isinstance(pkg, yum.rpmsack.RPMInstalledPackage): + return pkg + logger.log_info(('found multiple matching packages' + ' for query: {}' if len(exact) > 1 else + 'exact match found for query: {}') + .format(match_props)) + return exact[-1] # select highest version + + @staticmethod + def pkg2model_wrapper(namespace, classname): + """ + @return a function that transforms YumAvailablePackage object + to CIMInstanceName object + """ + + tos = pywbem.Uint16(get_target_operating_system()[0]) + + def pkg2model(env, pkg, keys_only=True, model=None): + """ + @param model if None, will be filled with data, otherwise + a new instance of CIMInstance or CIMObjectPath is created + """ + #if not isinstance(pkg, yum.rpmsack.RPMInstalledPackage): + #if not isinstance(pkg, yum.packages.YumHeaderPackage): + if not isinstance(pkg, yum.packages.YumAvailablePackage): + raise TypeError( + "pkg must be an instance of YumAvailablePackage") + if model is None: + model = pywbem.CIMInstanceName(classname, namespace=namespace) + if not keys_only: + model = pywbem.CIMInstance(classname, path=model) + if isinstance(model, pywbem.CIMInstance): + def _set_key(k, v): + model[k] = v + model.path[k] = v + else: + _set_key = model.__setitem__ + with YumDB.getInstance(env): + _set_key('Name', pkg.name) + _set_key('SoftwareElementID', pkg.nevra) + _set_key('SoftwareElementState', pywbem.Uint16(2 + if isinstance(pkg, yum.rpmsack.RPMInstalledPackage) + else 1)) + _set_key('TargetOperatingSystem', tos) + _set_key('Version', pkg.version) + if not keys_only: + model['Caption'] = pkg.summary + model['Description'] = pkg.description + if isinstance(pkg, yum.rpmsack.RPMInstalledPackage): + model['InstallDate'] = pywbem.CIMDateTime( + datetime.fromtimestamp(pkg.installtime)) + if pkg.vendor: + model['Manufacturer'] = pkg.vendor + model['Release'] = pkg.release + model['Epoch'] = pywbem.Uint16(pkg.epoch) + model["Architecture"] = pkg.arch + model['License'] = pkg.license + model['Group'] = pkg.group + model['Size'] = pywbem.Uint64(pkg.size) + return model + + return pkg2model + +class SoftwareFileCheck: + """ + Just a namespace for functions related to FileCheck provider. + """ + + passed_flags_descriptions = ( + "Existence", + "File Type", + "File Size", + "File Mode", + "Checksum", + "Device major/minor number", + "Symlink Target", + "User Ownership", "Group Ownership", + "Modify Time") + + checksumtype_str2num = dict((v, k) for (k, v) in + yum.constants.RPM_CHECKSUM_TYPES.items()) + + @staticmethod + def pkg_checksum_type(pkg): + """ + @return integer representation of checksum type + """ + if not isinstance(pkg, yum.packages.YumAvailablePackage): + raise TypeError("pkg must be an instance of YumAvailablePackage") + if isinstance(pkg, yum.rpmsack.RPMInstalledPackage): + return pkg.hdr[rpm.RPMTAG_FILEDIGESTALGO] + with self: # ensure, that _yum is inited + return SoftwareFileCheck.checksumtype_str2pywbem( + pkg.yumdb_info.checksum_type) + + @staticmethod + def checksumtype_num2hash(csumt): + return getattr(hashlib, yum.constants.RPM_CHECKSUM_TYPES[csumt]) + + @staticmethod + def checksumtype_str2pywbem(alg): + try: + res = SoftwareFileCheck.checksumtype_str2num[alg.lower()] + except KeyError: res = 0 + return pywbem.Uint16(res) + + @staticmethod + def filetype_str2pywbem(ft): + try: + return pywbem.Uint16( + { 'file' : 1 + , 'directory' : 2 + , 'symlink' : 3 + , 'fifo' : 4 + , 'character device' : 5 + , 'block device' : 6 + }[ft]) + except KeyError: + return pywbem.Uint16(0) + + @staticmethod + def filetype_mode2pywbem(mode): + for i, name in enumerate( + ('REG', 'DIR', 'LNK', 'FIFO', 'CHR', 'BLK'), 1): + if getattr(stat, 'S_IS' + name)(mode): + return pywbem.Uint16(i) + return pywbem.Uint16(0) + + @staticmethod + def mode2pywbem_flags(mode): + """ + @param mode if None, file does not exist + """ + if mode is None: return None + flags = [] + for i, c in enumerate(( + stat.S_IXOTH, + stat.S_IWOTH, + stat.S_IROTH, + stat.S_IXGRP, + stat.S_IWGRP, + stat.S_IRGRP, + stat.S_IXUSR, + stat.S_IWUSR, + stat.S_IRUSR, + stat.S_ISVTX, + stat.S_ISGID, + stat.S_ISUID)): + if c & mode: + flags.append(pywbem.Uint8(i)) + return flags + + @staticmethod + def hashfile(afile, hashers, blocksize=65536): + """ + @param hashers is a list of hash objects + @return list of digest strings (in hex format) for each hash object + given in the same order + """ + if not isinstance(hashers, (tuple, list, set, frozenset)): + hashers = (hashers, ) + buf = afile.read(blocksize) + while len(buf) > 0: + for h in hashers: h.update(buf) + buf = afile.read(blocksize) + return [ h.hexdigest() for h in hashers ] + + @staticmethod + def compute_checksums(env, checksum_type, file_type, file_path): + """ + @param file_type is not a file, then zeroes are returned + @param checksum_type selected hash algorithm to compute second + checksum + @return (md5sum, checksum) + both checksums are computed from file_path's content + first one is always md5, the second one depends on checksum_type + if file does not exists, (None, None) is returned + """ + hashers = [hashlib.md5()] + if checksum_type != SoftwareFileCheck.checksumtype_str2num["md5"]: + hashers.append(SoftwareFileCheck.checksumtype_num2hash(checksum_type)()) + if file_type != SoftwareFileCheck.filetype_str2pywbem('file'): + rslts = ['0'*len(h.hexdigest()) for h in hashers] + else: + try: + with open(file_path, 'rb') as f: + rslts = SoftwareFileCheck.hashfile(f, hashers) + except (OSError, IOError) as e: + env.get_logger().log_error("could not open file \"%s\"" + " for reading: %s" % (file_path, e)) + return None, None + return (rslts[0], rslts[1] if len(rslts) > 1 else rslts[0]*2) + + @staticmethod + def object_path2yumcheck(env, op): + """ + @return instance of yum.packages._RPMVerifyPackage + this object holds RPMInstalledPackage under its po attribute + """ + if not isinstance(op, pywbem.CIMInstanceName): + raise TypeError("op must be instance of CIMInstanceName, " + "not \"%s\"" % op.__class__.__name__) + log = env.get_logger() + + tos = get_target_operating_system()[0] + if ( not op['Name'] or not op['SoftwareElementID'] + or not op['CheckID'] + or not op['CheckID'].endswith('#'+op['Name']) + or op['SoftwareElementID'].find(op['Version']) == -1): + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong keys.") + if op['SoftwareElementState'] not in ("2", 2): + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "Only \"Executable\" software element state supported") + if not check_target_operating_system(op['TargetOperatingSystem']): + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "Wrong target operating system.") + if not op['Name'] or not op['Version']: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + 'Both "Name" and "Version" must be given') + m = re_nevra.match(op['SoftwareElementID']) + if not m: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "Wrong SotwareElementID. Expected valid nevra" + " (name-epoch:version-release.arch).") + if op['Version'] != m.group('ver'): + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "Version does not match version part in SoftwareElementID.") + + evra = "{}:{}-{}.{}".format(*( + (m.group(k) if k != "epoch" or m.group(k) else "0") + for k in ("epoch", 'ver', 'rel', 'arch'))) + with YumDB.getInstance(env) as yb: + pl = yb.doPackageLists('installed') + exact, matched, unmatched = yum.packages.parsePackages( + pl.installed, [m.group('name')]) + exact = yum.misc.unique(exact) + for pkg in exact: + if pkg.evra == evra: break + else: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "No matching package installed.") + + fi = pkg.hdr.fiFromHeader() + vpkg = yum.packages._RPMVerifyPackage(pkg, fi, + SoftwareFileCheck.pkg_checksum_type(pkg), [], True) + if not op['Name'] in vpkg: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "File not found in RPM package.") + return vpkg + + """ + Named tuple to store results of rpm file check as pywbem values, + all results are in form: + (expected, reality) + where + expected is value from rpm package + reality is value obtained from installed file + None means, that value could not be obtained + except for "exists" and "md5_checksum" attributes, where "exists" + is boolean and md5_checksum is a string + for example: + file_check.file_type == (4, 3) + """ + FileCheck = collections.namedtuple('FileCheck', + 'exists, md5_checksum, file_type, file_size, file_mode, ' + 'checksum, device, link_target, user_id, group_id, ' + 'last_modification_time') + + @staticmethod + def test_file(env, checksum_type, vpf): + """ + @param checksum type is a pywbem value for ChecksumType property + @return instance of FileCheck + """ + if not isinstance(vpf, yum.packages._RPMVerifyPackageFile): + raise TypeError("vpf must be an instance of _RPMVerifyPackage," + " not \"%s\"" % vpf.__class__.__name__) + exists = os.path.lexists(vpf.filename) + md5_checksum = None + expected = { + "file_type" : SoftwareFileCheck.filetype_str2pywbem(vpf.ftype), + "user_id" : pywbem.Uint32(pwd.getpwnam(vpf.user).pw_uid), + "group_id" : pywbem.Uint32(grp.getgrnam(vpf.group).gr_gid), + "file_mode" : pywbem.Uint32(vpf.mode), + "file_size" : pywbem.Uint64(vpf.size), + "link_target" : vpf.readlink if vpf.readlink else None, + "checksum" : vpf.digest[1], + "device" : (pywbem.Uint64(vpf.dev) + if vpf.ftype.endswith('device') else None), + "last_modification_time" : pywbem.Uint64(vpf.mtime) + } + if not exists: + reality = collections.defaultdict(lambda: None) + else: + fstat = os.lstat(vpf.filename) + reality = { + "file_type" : SoftwareFileCheck.filetype_mode2pywbem(fstat.st_mode), + "user_id" : pywbem.Uint32(fstat.st_uid), + "group_id" : pywbem.Uint32(fstat.st_gid), + "file_mode" : pywbem.Uint32(fstat.st_mode), + "file_size" : pywbem.Uint64(fstat.st_size), + "last_modification_time" : pywbem.Uint64(fstat.st_mtime) + } + reality["device"] = (pywbem.Uint64(fstat.st_dev) + if reality['file_type'] == SoftwareFileCheck.filetype_str2pywbem( + "device") else None) + reality["link_target"] = (os.readlink(vpf.filename) + if os.path.islink(vpf.filename) else None) + md5_checksum, checksum = SoftwareFileCheck.compute_checksums( + env, checksum_type, reality["file_type"], vpf.filename) + reality["checksum"] = checksum + kwargs = dict(exists=exists, md5_checksum=md5_checksum, + **dict((k, (expected[k], reality[k])) for k in expected)) + return SoftwareFileCheck.FileCheck(**kwargs) + + @staticmethod + def filecheck_passed(fc): + if not isinstance(fc, SoftwareFileCheck.FileCheck): + raise TypeError("fc must be an instance of FileCheck") + return ( fc.exists + and all( v[0] == v[1] + for k, v in fc._asdict().items() if ( + isinstance(v, tuple) + and ( k != "last_modification_time" + or fc.file_type[0] == \ + SoftwareFileCheck.filetype_str2pywbem("file") + )))) + + @staticmethod + def filecheck_wrapper(namespace, classname): + + def _filecheck2model_flags(fc): + flags = [] + for k, v in fc._asdict().items(): + if isinstance(v, tuple): + if ( k != "last_modification_time" + or fc.file_type[0] == SoftwareFileCheck.filetype_str2pywbem( + 'file')): + # last_modification_time check is valid only for + # regular files + flag = fc.exists and v[0] == v[1] + else: + flag = True + flags.append(flag) + elif isinstance(v, bool): + flags.append(v) + return flags + + def filecheck2model(vpkg, fn, env, keys_only=True, + model=None, fc=None): + if not isinstance(vpkg, yum.packages._RPMVerifyPackage): + raise TypeError( + "vpkg must be an instance of _RPMVerifyPackage") + if not fn in vpkg: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "File \"%s\" not found among package files" % fn) + if model is None: + model = pywbem.CIMInstanceName(classname, namespace=namespace) + if not keys_only: + model = pywbem.CIMInstance(classname, path=model) + if fc is not None: + if not isinstance(fc, SoftwareFileCheck.FileCheck): + raise TypeError("fc must be an instance of FileCheck") + pkg = vpkg.po + vpf = vpkg._files[fn] + model['Name'] = vpf.filename + model['SoftwareElementID'] = pkg.nevra + model['SoftwareElementState'] = pywbem.Uint16(2) + model['TargetOperatingSystem'] = pywbem.Uint16( + get_target_operating_system()[0]) + model['Version'] = pkg.version + model['CheckID'] = '%s#%s' % (pkg.name, vpf.filename) + if not keys_only: + #model['Caption'] = '' # TODO + #model['CheckMode'] = bool() # TODO + #model['CRC1'] = pywbem.Uint32() # TODO + #model['CRC2'] = pywbem.Uint32() # TODO + #model['CreateTimeStamp'] = pywbem.CIMDateTime() # TODO + #model['Description'] = '' # TODO + #model['ElementName'] = '' # TODO + #model['InstanceID'] = '' # TODO + model['FileName'] = os.path.basename(vpf.filename) + model['ChecksumType'] = csumt = \ + pywbem.Uint16(SoftwareFileCheck.pkg_checksum_type(pkg)) + if fc is None: + fc = SoftwareFileCheck.test_file(env, csumt, vpf) + for mattr, fattr in ( + ('FileType', 'file_type'), + ('FileUserID', 'user_id'), + ('FileGroupID', 'group_id'), + ('FileMode', 'file_mode'), + ('LastModificationTime', 'last_modification_time'), + ('FileSize', 'file_size'), + ('LinkTarget', 'link_target'), + ('Checksum', 'checksum')): + exp, rea = getattr(fc, fattr) + if exp is not None: + model['Expected' + mattr] = exp + if rea is not None: + model[mattr] = rea + model['ExpectedFileModeFlags'] = \ + SoftwareFileCheck.mode2pywbem_flags(fc.file_mode[0]) + if fc.exists: + model['FileModeFlags'] = SoftwareFileCheck.mode2pywbem_flags( + fc.file_mode[1]) + model['FileExists'] = fc.exists + if fc.md5_checksum is not None: + model['MD5Checksum'] = fc.md5_checksum + model['PassedFlags'] = _filecheck2model_flags(fc) + model['PassedFlagsDescriptions'] = list( + SoftwareFileCheck.passed_flags_descriptions) + return model + + return filecheck2model + |