# Software Management Providers # # Copyright (C) 2012 Red Hat, Inc. All rights reserved. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """Common utilities for LMI_Yum* providers """ import collections from datetime import datetime import grp import hashlib import os import platform import pwd import re import socket import stat import pywbem import yum import cmpi_pywbem_bindings as pycimmb import singletonmixin re_evra = re.compile(r'^(?P\d+):(?P[^-]+)' r'-(?P.+)\.(?P[^.]+)$') re_nevra = re.compile(r'^(?P.+)-(?P(?P\d+):(?P[^-]+)' r'-(?P.+)\.(?P[^.]+))$') rpmdb_path = '/var/lib/rpm/Packages' class YumDB(singletonmixin.Singleton): ignoreSubsequent = True def __init__(self, env, *args, **kwargs): if not isinstance(env, pycimmb.ProviderEnvironment): raise TypeError("env must be instance of" " pycimmb.ProviderEnvironment") self._yum = yum.YumBase(*args, **kwargs) self._db_mtime = 0 self._lock_cnt = 0 self.env = env env.get_logger().log_info('init called') def is_dirty(self): return self._db_mtime < os.stat(rpmdb_path).st_mtime def is_locked(self): return self._yum._lockfile is not None def update_db(self): self.env.get_logger().log_info('updating rpmdb') self._db_mtime = os.stat(rpmdb_path).st_mtime self._yum.doConfigSetup() self._yum.doTsSetup() self._yum.doRpmDBSetup() def __enter__(self): self._lock_cnt += 1 if self._lock_cnt < 2: if not self.is_locked() and self.is_dirty(): self.update_db() self._yum.doLock() return self def __exit__(self, exc_type, exc_value, traceback): if self._lock_cnt > 0: self._yum.closeRpmDB() self._yum.doUnlock() self._lock_cnt -= 1 def __getattr__(self, name): if not self.is_locked() and self.is_dirty(): self.update_db() return getattr(self._yum, name) def _get_distname(): if hasattr(platform, 'linux_distribution'): return platform.linux_distribution( full_distribution_name=False)[0].lower() else: return platform.dist()[0].lower() def get_target_operating_system(): """ @return (val, text). Where val is a number from ValueMap of TargetOperatingSystem property of CIM_SoftwareElement class and text is its testual representation. """ system = platform.system() if system.lower() == 'linux': try: val, dist = \ { 'redhat' : (79, 'RedHat Enterprise Linux') , 'suse' : (81, 'SUSE') , 'mandriva' : (88, 'Mandriva') , 'ubuntu' : (93, 'Ubuntu') , 'debian' : (95, 'Debian') }[_get_distname()] except KeyError: linrel = platform.uname()[2] if linrel.startswith('2.4'): val, dist = (97, 'Linux 2.4.x') elif linrel.startswith('2.6'): val, dist = (99, 'Linux 2.6.x') else: return (36, 'LINUX') # no check for x86_64 if platform.machine() == 'x86_64': val += 1 dist += ' 64-Bit' return (val, dist) elif system.lower() in ('macosx', 'darwin'): return (2, 'MACOS') # elif system.lower() == 'windows': # no general value else: return (0, 'Unknown') def check_target_operating_system(system): """ @return if param system matches current target operating system """ if isinstance(system, basestring): system = int(system) if not isinstance(system, (int, long)): raise TypeError("system must be either string or integer, not {}" .format(system.__class__.__name__)) tos = get_target_operating_system() if system == tos: return True if system == 36: # linux if platform.system().lower() == "linux": return True if ( system >= 97 and system <= 100 # linux 2.x.x and platform.uname()[2].startswith('2.4' if system < 99 else '2.6') # check machine and ( bool(platform.machine().endswith('64')) == bool(not (system % 2)))): return True return False def match_pkg(pkg, **kwargs): """ all not Null and not empty arguments will be matched against pkg attributes; if all of them match, return True possible arguments: name, epoch, version, release, arch evra, nevra """ for a in ('evra', 'nevra', 'name', 'epoch', 'version', 'release', 'arch'): v = kwargs.get(a, None) if v and getattr(pkg, a) != v: return False return True class YumPackage: """ Just a namespace for common function related to YumPackage provider. """ @staticmethod def parse_arch(arch, env): try: res = \ { "noarch" : 0 , "i386" : 4 , "i586" : 5 , "i686" : 6 , "x86_64" : 7 , "athlon" : 8 , "ia32e" : 10 , "ia64" : 11 , "powerpc" : 12 , "ppc" : 12 , "ppc64" : 13 , "sparc" : 15 , "sparcv9" : 16 , "sparc64" : 17 , "arm" : 20 , "arm6" : 21 , "arm7" : 22 , "arm8" : 23 , "arm9" : 24 , "strongarm" : 30 , "xscale" : 31 , "parisc" : 35 , "parisc11" : 36 , "mips" : 40 , "mips32" : 41 , "mips64" : 42 , "mipsnecvr4xxx" : 45 , "mipsrm7000" : 46 , "mipstx39" : 50 , "mipstx49" : 51 , "v850" : 55 , "alpha" : 60 , "alphaev6" : 61 , "alphaev7" : 62 , "alphaev8" : 63 , "alphaev9" : 64 , "sh" : 70 , "sh2" : 71 , "sh3" : 72 , "sh4" : 73 , "sh5" : 74 }[arch.lower()] except KeyError: logger = env.get_logger() logger.log_error("failed to parse architecture '{}'".format(arch)) res = 2 return pywbem.Uint16(res) @staticmethod def parse_license(lic, env): try: res = \ { "freely redistributable without restriction" : 2 , "glide" : 5 , "afl" : 6 , "adobe" : 7 , "mit" : 10 , "mit with advertising" : 11 , "agpl" : 15 , "agplv1" : 16 , "agplv3" : 17 , "agplv3+" : 18 , "agplv3 with exceptions" : 19 , "adsl" : 25 , "amdplpa" : 26 , "asl" : 30 , "asl 1.0" : 31 , "asl 1.1" : 32 , "asl 2.0" : 33 , "artistic clarified" : 35 , "artistic 2.0" : 36 , "arl" : 40 , "aal" : 41 , "bahyph" : 42 , "barr" : 43 , "beerware" : 44 , "beopen" : 45 , "bibtex" : 46 , "boost" : 47 , "borceux" : 48 , "bsd" : 50 , "bsd with advertising" : 51 , "bsd with attribution" : 52 , "bsd protection" : 53 , "ampas bsd" : 55 , "lbnl bsd" : 56 , "ricebsd" : 57 , "catosl" : 65 , "cecill" : 66 , "cecill-b" : 67 , "cecill-c" : 68 , "netscape" : 70 , "cnri" : 71 , "cddl" : 72 , "cpl" : 73 , "condor" : 75 , "copyright only" : 76 , "cpal" : 77 , "gpl" : 80 , "gpl+" : 81 , "gpl+ with exceptions" : 82 , "gplv1" : 83 , "gplv2" : 84 , "gplv2 with exceptions" : 85 , "gplv2+" : 86 , "gplv2+ with exceptions" : 87 , "gplv3" : 88 , "gplv3 with exceptions" : 89 , "gplv3+" : 90 , "gplv3+ with exceptions" : 91 , "gpl+ or artistic" : 100 , "gplv2 or artistic" : 101 , "gplv2+ or artistic" : 102 , "lgpl" : 110 , "lgplv2" : 111 , "lgplv2 with exceptions" : 112 , "lgplv2+" : 113 , "lgplv3" : 115 , "lgplv3 with exceptions" : 116 , "lgplv3+" : 117 , "cc0" : 125 , "crossword" : 126 , "crystal stacker" : 127 , "mpl" : 130 , "mplv1.0" : 131 , "mplv1.1" : 132 , "mplv2.0" : 133 , "diffmark" : 135 , "wtfpl" : 136 , "doc" : 137 , "dotseqn" : 138 , "dsdp" : 139 , "dvipdfm" : 140 , "epl" : 141 , "ecos" : 142 , "ecl 1.0" : 145 , "ecl 2.0" : 146 , "egenix" : 150 , "efl 2.0" : 151 , "entessa" : 152 , "erpl" : 153 , "eu datagrid" : 154 , "eupl 1.1" : 155 , "eurosym" : 156 , "fair" : 157 , "ftl" : 158 , "gitware" : 159 , "gl2ps" : 160 , "glulxe" : 161 , "gnuplot" : 162 , "haskellreport" : 163 , "ibm" : 164 , "imatrix" : 165 , "imagemagick" : 166 , "imlib2" : 167 , "ijg" : 168 , "intel acpi" : 169 , "interbase" : 170 , "isc" : 171 , "jabber" : 172 , "jasper" : 173 , "jpython" : 174 , "julius" : 175 , "knuth" : 176 , "lppl" : 177 , "latex2e" : 178 , "leptonica" : 179 , "lhcyr" : 180 , "libtiff" : 181 , "logica" : 182 , "makeindex" : 183 , "mecab-ipadic" : 184 , "ms-pl" : 185 , "ms-rl" : 186 , "midnight" : 187 , "miros" : 188 , "mod_macro" : 189 , "motosoto" : 190 , "naumen" : 191 , "ncsa" : 192 , "netcdf" : 193 , "ngpl" : 194 , "nosl" : 195 , "newmat" : 196 , "newsletr" : 197 , "nokia" : 198 , "nlpl" : 200 , "noweb" : 201 , "openldap" : 202 , "oml" : 203 , "openpbs" : 204 , "osl 1.0" : 205 , "osl 1.1" : 206 , "osl 2.0" : 207 , "osl 2.1" : 208 , "osl 3.0" : 209 , "openssl" : 215 , "oreilly" : 216 , "par" : 217 , "phorum" : 220 , "php" : 221 , "plaintex" : 222 , "plexus" : 223 , "postgresql" : 224 , "psfrag" : 225 , "psutils" : 226 , "public domain" : 227 , "python" : 228 , "qhull" : 229 , "qpl" : 230 , "rdisc" : 231 , "rpsl" : 232 , "romio" : 233 , "rsfs" : 235 , "ruby" : 236 , "saxpath" : 237 , "scea" : 238 , "scrip" : 239 , "sendmail" : 240 , "sleepycat" : 241 , "slib" : 242 , "snia" : 243 , "sissl" : 244 , "spl" : 245 , "tcl" : 246 , "teeworlds" : 247 , "tpl" : 248 , "threeparttable" : 249 , "tmate" : 250 , "torquev1.1" : 251 , "tosl" : 252 , "ucd" : 253 , "vim" : 254 , "vnlsl" : 255 , "vostrom" : 256 , "vsl" : 257 , "w3c" : 258 , "webmin" : 259 , "wsuipa" : 260 , "wxwidgets" : 261 , "xinetd" : 262 , "xerox" : 263 , "xpp" : 264 , "xskat" : 265 , "yplv1.1" : 266 , "zed" : 270 , "zend" : 271 , "zplv1.0" : 275 , "zplv2.0" : 276 , "zplv2.1" : 277 , "zlib" : 280 , "zlib with acknowledgement" : 281 , "cdl" : 400 , "cc-by" : 401 , "cc-by-sa" : 402 , "cc-by-nd" : 403 , "dsl" : 405 , "dmtf" : 406 , "oal" : 407 , "fbsddl" : 408 , "gfdl" : 409 , "ieee" : 410 , "ldpl" : 411 , "ofsfdl" : 412 , "open publication" : 413 , "public use" : 414 , "efml" : 415 , "free art" : 416 , "geotratis" : 417 , "green openmusic" : 418 , "ofl" : 419 , "utopia" : 420 , "ams" : 421 , "arphic" : 422 , "baekmuk" : 423 , "bitstream vera" : 424 , "charter" : 425 , "doublestroke" : 426 , "ec" : 427 , "elvish" : 428 , "lppl" : 429 , "hershey" : 430 , "ipa" : 431 , "liberation" : 432 , "lucida" : 433 , "mgopen" : 434 , "mplus" : 435 , "ptfl" : 436 , "punknova" : 437 , "stix" : 438 , "wadalab" : 439 , "xano" : 440 }[lic.lower()] except KeyError: logger = env.get_logger() logger.log_error("failed to parse license '{}'".format(lic)) res = 1 return pywbem.Uint16(res) @staticmethod def parse_group(group, env): try: res = \ { "amusements/games" : 3 , "amusements/graphics" : 4 , "applications/archiving" : 5 , "applications/communications" : 6 , "applications/databases" : 7 , "applications/editors" : 8 , "applications/emulators" : 9 , "applications/engineering" : 10 , "applications/file" : 11 , "applications/internet" : 12 , "applications/multimedia" : 13 , "applications/productivity" : 14 , "applications/publishing" : 15 , "applications/system" : 16 , "applications/text" : 17 , "development/build tools" : 18 , "development/debug" : 19 , "development/debuggers" : 20 , "development/documentation" : 21 , "development/java" : 22 , "development/languages" : 23 , "development/libraries" : 24 , "development/libraries/java" : 25 , "development/system" : 26 , "development/tools" : 27 , "documentation" : 28 , "internet/www/dynamic content" : 29 , "system/libraries" : 30 , "system environment/base" : 31 , "system environment/daemons" : 32 , "system environment/kernel" : 33 , "system environment/libraries" : 34 , "system environment/shells" : 35 , "text processing/markup/xml" : 36 , "user interface/desktops" : 37 , "user interface/x" : 38 , "user interface/x hardware support" : 39 , "utilities" : 40 }[group.lower()] except KeyError: logger = env.get_logger() if not group or group.lower() == "unspecified": logger.log_info("unspecified group '{}'".format(group)) res = 2 else: logger.log_error("failed to parse group '{}'".format(group)) res = 0 return pywbem.Uint16(res) @staticmethod def object_path2pkg(env, op): if not isinstance(op, pywbem.CIMInstanceName): raise TypeError("op must be an instance of CIMInstanceName") tos = get_target_operating_system()[0] if ( not op['Name'] or not op['SoftwareElementID'] or not op['SoftwareElementID'].startswith(op['Name']) or op['SoftwareElementID'].find(op['Version']) == -1): raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong keys.") if op['SoftwareElementState'] not in ("2", 2): raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Only \"Executable\" software element state supported") if not check_target_operating_system(op['TargetOperatingSystem']): raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong target operating system.") if not op['Name'] or not op['Version']: raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, 'Both "Name" and "Version" must be given') m = re_nevra.match(op['SoftwareElementID']) if not m: raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong SotwareElementID. Expected valid nevra" " (name-epoch:version-release.arch).") if op['Version'] != m.group('ver'): raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Version does not match version part in SoftwareElementID.") kwargs = dict((k, m.group(k)) for k in ('name', 'epoch', 'ver', 'rel', 'arch')) with YumDB.getInstance(env) as yb: if not yb.rpmdb.installed(**kwargs): raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Package not installed") pl = yb.doPackageLists('installed') exact, matched, unmatched = yum.packages.parsePackages( pl.installed, [op['Name']]) exact = yum.misc.unique(exact) for pkg in exact: if pkg.evra == m.group('evra'): break else: raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "No matching package installed") return pkg @staticmethod def pkg2model_wrapper(namespace, classname): """ @return a function that transforms YumAvailablePackage object to CIMInstanceName object """ tos = pywbem.Uint16(get_target_operating_system()[0]) def pkg2model(pkg, env, keys_only=True, model=None): """ @param model if None, will be filled with data, otherwise a new instance of CIMInstance or CIMObjectPath is created """ #if not isinstance(pkg, yum.rpmsack.RPMInstalledPackage): #if not isinstance(pkg, yum.packages.YumHeaderPackage): if not isinstance(pkg, yum.packages.YumAvailablePackage): raise TypeError( "pkg must be an instance of YumAvailablePackage") if model is None: model = pywbem.CIMInstanceName(classname, namespace=namespace) if not keys_only: model = pywbem.CIMInstance(classname, path=model) model['Name'] = pkg.name model['SoftwareElementID'] = pkg.nevra model['SoftwareElementState'] = pywbem.Uint16(2) model['TargetOperatingSystem'] = tos model['Version'] = pkg.version if not keys_only: model['Caption'] = pkg.summary model['Description'] = pkg.description model['InstallDate'] = pywbem.CIMDateTime( datetime.fromtimestamp(pkg.installtime)) if pkg.vendor: model['Manufacturer'] = pkg.vendor model['Release'] = pkg.release model['Epoch'] = pywbem.Uint16(pkg.epoch) model["Architecture"] = YumPackage.parse_arch(pkg.arch, env) model['License'] = YumPackage.parse_license(pkg.license, env) model['LicenseString']= pkg.license model['Group'] = YumPackage.parse_group(pkg.group, env) model['GroupString'] = pkg.group model['Size'] = pywbem.Uint64(pkg.size) return model return pkg2model class YumFileCheck: """ Just a namespace for functions related to FileCheck provider. """ passed_flags_descriptions = ( "Existence", "File Type", "File Size", "File Mode", "Checksum", "Device major/minor number", "Symlink Target", "User Ownership", "Group Ownership", "Modify Time") checksumtype_str2num = dict((v, k) for (k, v) in yum.constants.RPM_CHECKSUM_TYPES.items()) @staticmethod def checksumtype_num2hash(csumt): return getattr(hashlib, yum.constants.RPM_CHECKSUM_TYPES[csumt]) @staticmethod def checksumtype_str2pywbem(alg): try: res = YumFileCheck.checksumtype_str2num[alg.lower()] except KeyError: res = 0 return pywbem.Uint16(res) @staticmethod def filetype_str2pywbem(ft): try: return pywbem.Uint16( { 'file' : 1 , 'directory' : 2 , 'symlink' : 3 , 'fifo' : 4 , 'character device' : 5 , 'block device' : 6 }[ft]) except KeyError: return pywbem.Uint16(0) @staticmethod def filetype_mode2pywbem(mode): for i, name in enumerate( ('REG', 'DIR', 'LNK', 'FIFO', 'CHR', 'BLK'), 1): if getattr(stat, 'S_IS' + name)(mode): return pywbem.Uint16(i) return pywbem.Uint16(0) @staticmethod def mode2pywbem_flags(mode): """ @param mode if None, file does not exist """ if mode is None: return None flags = [] for i, c in enumerate(( stat.S_IXOTH, stat.S_IWOTH, stat.S_IROTH, stat.S_IXGRP, stat.S_IWGRP, stat.S_IRGRP, stat.S_IXUSR, stat.S_IWUSR, stat.S_IRUSR, stat.S_ISVTX, stat.S_ISGID, stat.S_ISUID)): if c & mode: flags.append(pywbem.Uint8(i)) return flags @staticmethod def hashfile(afile, hashers, blocksize=65536): """ @param hashers is a list of hash objects @return list of digest strings (in hex format) for each hash object given in the same order """ if not isinstance(hashers, (tuple, list, set, frozenset)): hashers = (hashers, ) buf = afile.read(blocksize) while len(buf) > 0: for h in hashers: h.update(buf) buf = afile.read(blocksize) return [ h.hexdigest() for h in hashers ] @staticmethod def compute_checksums(env, checksum_type, file_type, file_path): """ @param file_type is not a file, then zeroes are returned @param checksum_type selected hash algorithm to compute second checksum @return (md5sum, checksum) both checksums are computed from file_path's content first one is always md5, the second one depends on checksum_type if file does not exists, (None, None) is returned """ hashers = [hashlib.md5()] if checksum_type != YumFileCheck.checksumtype_str2num["md5"]: hashers.append(YumFileCheck.checksumtype_num2hash(checksum_type)()) if file_type != YumFileCheck.filetype_str2pywbem('file'): rslts = ['0'*len(h.hexdigest()) for h in hashers] else: try: with open(file_path, 'rb') as f: rslts = YumFileCheck.hashfile(f, hashers) except (OSError, IOError) as e: env.get_logger().log_error("could not open file \"%s\"" " for reading: %s" % (file_path, e)) return None, None return (rslts[0], rslts[1] if len(rslts) > 1 else rslts[0]*2) @staticmethod def object_path2yumcheck(env, op): """ @return instance of yum.packages._RPMVerifyPackage this object holds YumInstalledPackage under its po attribute """ if not isinstance(op, pywbem.CIMInstanceName): raise TypeError("op must be instance of CIMInstanceName, " "not \"%s\"" % op.__class__.__name__) log = env.get_logger() tos = get_target_operating_system()[0] if ( not op['Name'] or not op['SoftwareElementID'] or not op['CheckID'] or not op['CheckID'].endswith('#'+op['Name']) or op['SoftwareElementID'].find(op['Version']) == -1): raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong keys.") if op['SoftwareElementState'] not in ("2", 2): raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Only \"Executable\" software element state supported") if not check_target_operating_system(op['TargetOperatingSystem']): raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong target operating system.") if not op['Name'] or not op['Version']: raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, 'Both "Name" and "Version" must be given') m = re_nevra.match(op['SoftwareElementID']) if not m: raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong SotwareElementID. Expected valid nevra" " (name-epoch:version-release.arch).") if op['Version'] != m.group('ver'): raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Version does not match version part in SoftwareElementID.") kwargs = dict((k, m.group(k)) for k in ('name', 'epoch', 'ver', 'rel', 'arch')) with YumDB.getInstance(env) as yb: if not yb.rpmdb.installed(**kwargs): raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Package not installed") pl = yb.doPackageLists('installed') exact, matched, unmatched = yum.packages.parsePackages( pl.installed, [kwargs['name']]) exact = yum.misc.unique(exact) for pkg in exact: if pkg.evra == m.group('evra'): break else: raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "No matching package installed") fi = pkg.hdr.fiFromHeader() vpkg = yum.packages._RPMVerifyPackage( pkg, fi, pkg.yumdb_info.checksum_type, [], True) if not op['Name'] in vpkg: raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "File not found in RPM package") return vpkg """ Named tuple to store results of rpm file check as pywbem values, all results are in form: (expected, reality) where expected is value from rpm package reality is value obtained from installed file None means, that value could not be obtained except for "exists" and "md5_checksum" attributes, where "exists" is boolean and md5_checksum is a string for example: file_check.file_type == (4, 3) """ FileCheck = collections.namedtuple('FileCheck', 'exists, md5_checksum, file_type, file_size, file_mode, ' 'checksum, device, link_target, user_id, group_id, ' 'last_modification_time') @staticmethod def test_file(env, checksum_type, vpf): """ @param checksum type is a pywbem value for ChecksumType property @return instance of FileCheck """ if not isinstance(vpf, yum.packages._RPMVerifyPackageFile): raise TypeError("vpf must be an instance of _RPMVerifyPackage," " not \"%s\"" % vpf.__class__.__name__) exists = os.path.lexists(vpf.filename) md5_checksum = None expected = { "file_type" : YumFileCheck.filetype_str2pywbem(vpf.ftype), "user_id" : pywbem.Uint32(pwd.getpwnam(vpf.user).pw_uid), "group_id" : pywbem.Uint32(grp.getgrnam(vpf.group).gr_gid), "file_mode" : pywbem.Uint32(vpf.mode), "file_size" : pywbem.Uint64(vpf.size), "link_target" : vpf.readlink if vpf.readlink else None, "checksum" : vpf.digest[1], "device" : (pywbem.Uint64(vpf.dev) if vpf.ftype.endswith('device') else None), "last_modification_time" : pywbem.Uint64(vpf.mtime) } if not exists: reality = collections.defaultdict(lambda: None) else: fstat = os.lstat(vpf.filename) reality = { "file_type" : YumFileCheck.filetype_mode2pywbem(fstat.st_mode), "user_id" : pywbem.Uint32(fstat.st_uid), "group_id" : pywbem.Uint32(fstat.st_gid), "file_mode" : pywbem.Uint32(fstat.st_mode), "file_size" : pywbem.Uint64(fstat.st_size), "last_modification_time" : pywbem.Uint64(fstat.st_mtime) } reality["device"] = (pywbem.Uint64(fstat.st_dev) if reality['file_type'] == YumFileCheck.filetype_str2pywbem( "device") else None) reality["link_target"] = (os.readlink(vpf.filename) if os.path.islink(vpf.filename) else None) md5_checksum, checksum = YumFileCheck.compute_checksums( env, checksum_type, reality["file_type"], vpf.filename) reality["checksum"] = checksum kwargs = dict(exists=exists, md5_checksum=md5_checksum, **dict((k, (expected[k], reality[k])) for k in expected)) return YumFileCheck.FileCheck(**kwargs) @staticmethod def filecheck_passed(fc): if not isinstance(fc, YumFileCheck.FileCheck): raise TypeError("fc must be an instance of FileCheck") return fc.exists and all(v[0] == v[1] for v in fc if isinstance(v, tuple)) @staticmethod def filecheck_wrapper(namespace, classname): def _filecheck2model_flags(fc): flags = [] for v in fc: if isinstance(v, tuple): flags.append(fc.exists and v[0] == v[1]) elif isinstance(v, bool): flags.append(v) return flags def filecheck2model(vpkg, fn, env, keys_only=True, model=None): if not isinstance(vpkg, yum.packages._RPMVerifyPackage): raise TypeError( "vpkg must be an instance of _RPMVerifyPackage") if not fn in vpkg: raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "File \"%s\" not found among package files" % fn) if model is None: model = pywbem.CIMInstanceName(classname, namespace=namespace) if not keys_only: model = pywbem.CIMInstance(classname, path=model) pkg = vpkg.po vpf = vpkg._files[fn] model['Name'] = vpf.filename model['SoftwareElementID'] = pkg.nevra model['SoftwareElementState'] = pywbem.Uint16(2) model['TargetOperatingSystem'] = pywbem.Uint16( get_target_operating_system()[0]) model['Version'] = pkg.version model['CheckID'] = '%s#%s' % (pkg.name, vpf.filename) if not keys_only: #model['Caption'] = '' # TODO #model['CheckMode'] = bool() # TODO #model['CRC1'] = pywbem.Uint32() # TODO #model['CRC2'] = pywbem.Uint32() # TODO #model['CreateTimeStamp'] = pywbem.CIMDateTime() # TODO #model['Description'] = '' # TODO #model['ElementName'] = '' # TODO #model['InstanceID'] = '' # TODO model['FileName'] = os.path.basename(vpf.filename) model['ChecksumType'] = csumt = \ YumFileCheck.checksumtype_str2pywbem( pkg.yumdb_info.checksum_type) fc = YumFileCheck.test_file(env, csumt, vpf) for mattr, fattr in ( ('FileType', 'file_type'), ('FileUserID', 'user_id'), ('FileGroupID', 'group_id'), ('FileMode', 'file_mode'), ('LastModificationTime', 'last_modification_time'), ('FileSize', 'file_size'), ('LinkTarget', 'link_target'), ('Checksum', 'checksum')): exp, rea = getattr(fc, fattr) if exp is not None: model['Expected' + mattr] = exp if rea is not None: model[mattr] = rea model['ExpectedFileModeFlags'] = \ YumFileCheck.mode2pywbem_flags(fc.file_mode[0]) if fc.exists: model['FileModeFlags'] = YumFileCheck.mode2pywbem_flags( fc.file_mode[1]) model['FileExists'] = fc.exists if fc.md5_checksum is not None: model['MD5Checksum'] = fc.md5_checksum model['PassedFlags'] = _filecheck2model_flags(fc) model['PassedFlagsDescriptions'] = list( YumFileCheck.passed_flags_descriptions) return model return filecheck2model