# -*- encoding: utf-8 -*- # Software Management Providers # # Copyright (C) 2012 Red Hat, Inc. All rights reserved. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """Common utilities for LMI_Yum* providers """ import collections from datetime import datetime import grp import hashlib import itertools import os import platform import pwd import re import socket import stat import pywbem import yum import cmpi_pywbem_bindings as pycimmb import singletonmixin re_evra = re.compile(r'^(?P\d+):(?P[^-]+)' r'-(?P.+)\.(?P[^.]+)$') re_nevra = re.compile(r'^(?P.+)-(?P(?P\d+):(?P[^-]+)' r'-(?P.+)\.(?P[^.]+))$') rpmdb_path = '/var/lib/rpm/Packages' class YumDB(singletonmixin.Singleton): ignoreSubsequent = True def __init__(self, env, *args, **kwargs): if not isinstance(env, pycimmb.ProviderEnvironment): raise TypeError("env must be instance of" " pycimmb.ProviderEnvironment") self._yum_args = (args, kwargs) self._yum = None self._db_mtime = 0 self._lock_cnt = 0 self.env = env env.get_logger().log_info('init called') def is_dirty(self): return self._db_mtime < os.stat(rpmdb_path).st_mtime def is_locked(self): return self._yum._lockfile is not None def update_db(self): self.env.get_logger().log_info('updating rpmdb') self._db_mtime = os.stat(rpmdb_path).st_mtime self._yum.doConfigSetup() self._yum.doTsSetup() self._yum.doRpmDBSetup() def __enter__(self): self._lock_cnt += 1 if self._lock_cnt < 2: self._yum = yum.YumBase(*self._yum_args[0], **self._yum_args[1]) if not self.is_locked() and self.is_dirty(): self.update_db() self._yum.doLock() return self def __exit__(self, exc_type, exc_value, traceback): if self._lock_cnt == 1: del self._yum self._lock_cnt -= 1 def __getattr__(self, name): if not self.is_locked() and self.is_dirty(): self.update_db() return getattr(self._yum, name) def _get_distname(): if hasattr(platform, 'linux_distribution'): return platform.linux_distribution( full_distribution_name=False)[0].lower() else: return platform.dist()[0].lower() def get_target_operating_system(): """ @return (val, text). Where val is a number from ValueMap of TargetOperatingSystem property of CIM_SoftwareElement class and text is its testual representation. """ system = platform.system() if system.lower() == 'linux': try: val, dist = \ { 'redhat' : (79, 'RedHat Enterprise Linux') , 'suse' : (81, 'SUSE') , 'mandriva' : (88, 'Mandriva') , 'ubuntu' : (93, 'Ubuntu') , 'debian' : (95, 'Debian') }[_get_distname()] except KeyError: linrel = platform.uname()[2] if linrel.startswith('2.4'): val, dist = (97, 'Linux 2.4.x') elif linrel.startswith('2.6'): val, dist = (99, 'Linux 2.6.x') else: return (36, 'LINUX') # no check for x86_64 if platform.machine() == 'x86_64': val += 1 dist += ' 64-Bit' return (val, dist) elif system.lower() in ('macosx', 'darwin'): return (2, 'MACOS') # elif system.lower() == 'windows': # no general value else: return (0, 'Unknown') def check_target_operating_system(system): """ @return if param system matches current target operating system """ if isinstance(system, basestring): system = int(system) if not isinstance(system, (int, long)): raise TypeError("system must be either string or integer, not {}" .format(system.__class__.__name__)) tos = get_target_operating_system() if system == tos: return True if system == 36: # linux if platform.system().lower() == "linux": return True if ( system >= 97 and system <= 100 # linux 2.x.x and platform.uname()[2].startswith('2.4' if system < 99 else '2.6') # check machine and ( bool(platform.machine().endswith('64')) == bool(not (system % 2)))): return True return False def match_pkg(pkg, **kwargs): """ all not Null and not empty arguments will be matched against pkg attributes; if all of them match, return True possible arguments: name, epoch, version, release, arch evra, nevra """ for a in ('evra', 'nevra', 'name', 'epoch', 'version', 'release', 'arch'): v = kwargs.get(a, None) if v and getattr(pkg, a) != v: return False return True class YumPackage: """ Just a namespace for common function related to YumPackage provider. """ @staticmethod def parse_arch(arch, env): try: res = \ { "noarch" : 0 , "i386" : 4 , "i586" : 5 , "i686" : 6 , "x86_64" : 7 , "athlon" : 8 , "ia32e" : 10 , "ia64" : 11 , "powerpc" : 12 , "ppc" : 12 , "ppc64" : 13 , "sparc" : 15 , "sparcv9" : 16 , "sparc64" : 17 , "arm" : 20 , "arm6" : 21 , "arm7" : 22 , "arm8" : 23 , "arm9" : 24 , "strongarm" : 30 , "xscale" : 31 , "parisc" : 35 , "parisc11" : 36 , "mips" : 40 , "mips32" : 41 , "mips64" : 42 , "mipsnecvr4xxx" : 45 , "mipsrm7000" : 46 , "mipstx39" : 50 , "mipstx49" : 51 , "v850" : 55 , "alpha" : 60 , "alphaev6" : 61 , "alphaev7" : 62 , "alphaev8" : 63 , "alphaev9" : 64 , "sh" : 70 , "sh2" : 71 , "sh3" : 72 , "sh4" : 73 , "sh5" : 74 }[arch.lower()] except KeyError: logger = env.get_logger() logger.log_error("failed to parse architecture '{}'".format(arch)) res = 2 return pywbem.Uint16(res) @staticmethod def parse_license(lic, env): try: res = \ { "freely redistributable without restriction" : 2 , "glide" : 5 , "afl" : 6 , "adobe" : 7 , "mit" : 10 , "mit with advertising" : 11 , "agpl" : 15 , "agplv1" : 16 , "agplv3" : 17 , "agplv3+" : 18 , "agplv3 with exceptions" : 19 , "adsl" : 25 , "amdplpa" : 26 , "asl" : 30 , "asl 1.0" : 31 , "asl 1.1" : 32 , "asl 2.0" : 33 , "artistic clarified" : 35 , "artistic 2.0" : 36 , "arl" : 40 , "aal" : 41 , "bahyph" : 42 , "barr" : 43 , "beerware" : 44 , "beopen" : 45 , "bibtex" : 46 , "boost" : 47 , "borceux" : 48 , "bsd" : 50 , "bsd with advertising" : 51 , "bsd with attribution" : 52 , "bsd protection" : 53 , "ampas bsd" : 55 , "lbnl bsd" : 56 , "ricebsd" : 57 , "catosl" : 65 , "cecill" : 66 , "cecill-b" : 67 , "cecill-c" : 68 , "netscape" : 70 , "cnri" : 71 , "cddl" : 72 , "cpl" : 73 , "condor" : 75 , "copyright only" : 76 , "cpal" : 77 , "gpl" : 80 , "gpl+" : 81 , "gpl+ with exceptions" : 82 , "gplv1" : 83 , "gplv2" : 84 , "gplv2 with exceptions" : 85 , "gplv2+" : 86 , "gplv2+ with exceptions" : 87 , "gplv3" : 88 , "gplv3 with exceptions" : 89 , "gplv3+" : 90 , "gplv3+ with exceptions" : 91 , "gpl+ or artistic" : 100 , "gplv2 or artistic" : 101 , "gplv2+ or artistic" : 102 , "lgpl" : 110 , "lgplv2" : 111 , "lgplv2 with exceptions" : 112 , "lgplv2+" : 113 , "lgplv3" : 115 , "lgplv3 with exceptions" : 116 , "lgplv3+" : 117 , "cc0" : 125 , "crossword" : 126 , "crystal stacker" : 127 , "mpl" : 130 , "mplv1.0" : 131 , "mplv1.1" : 132 , "mplv2.0" : 133 , "diffmark" : 135 , "wtfpl" : 136 , "doc" : 137 , "dotseqn" : 138 , "dsdp" : 139 , "dvipdfm" : 140 , "epl" : 141 , "ecos" : 142 , "ecl 1.0" : 145 , "ecl 2.0" : 146 , "egenix" : 150 , "efl 2.0" : 151 , "entessa" : 152 , "erpl" : 153 , "eu datagrid" : 154 , "eupl 1.1" : 155 , "eurosym" : 156 , "fair" : 157 , "ftl" : 158 , "gitware" : 159 , "gl2ps" : 160 , "glulxe" : 161 , "gnuplot" : 162 , "haskellreport" : 163 , "ibm" : 164 , "imatrix" : 165 , "imagemagick" : 166 , "imlib2" : 167 , "ijg" : 168 , "intel acpi" : 169 , "interbase" : 170 , "isc" : 171 , "jabber" : 172 , "jasper" : 173 , "jpython" : 174 , "julius" : 175 , "knuth" : 176 , "lppl" : 177 , "latex2e" : 178 , "leptonica" : 179 , "lhcyr" : 180 , "libtiff" : 181 , "logica" : 182 , "makeindex" : 183 , "mecab-ipadic" : 184 , "ms-pl" : 185 , "ms-rl" : 186 , "midnight" : 187 , "miros" : 188 , "mod_macro" : 189 , "motosoto" : 190 , "naumen" : 191 , "ncsa" : 192 , "netcdf" : 193 , "ngpl" : 194 , "nosl" : 195 , "newmat" : 196 , "newsletr" : 197 , "nokia" : 198 , "nlpl" : 200 , "noweb" : 201 , "openldap" : 202 , "oml" : 203 , "openpbs" : 204 , "osl 1.0" : 205 , "osl 1.1" : 206 , "osl 2.0" : 207 , "osl 2.1" : 208 , "osl 3.0" : 209 , "openssl" : 215 , "oreilly" : 216 , "par" : 217 , "phorum" : 220 , "php" : 221 , "plaintex" : 222 , "plexus" : 223 , "postgresql" : 224 , "psfrag" : 225 , "psutils" : 226 , "public domain" : 227 , "python" : 228 , "qhull" : 229 , "qpl" : 230 , "rdisc" : 231 , "rpsl" : 232 , "romio" : 233 , "rsfs" : 235 , "ruby" : 236 , "saxpath" : 237 , "scea" : 238 , "scrip" : 239 , "sendmail" : 240 , "sleepycat" : 241 , "slib" : 242 , "snia" : 243 , "sissl" : 244 , "spl" : 245 , "tcl" : 246 , "teeworlds" : 247 , "tpl" : 248 , "threeparttable" : 249 , "tmate" : 250 , "torquev1.1" : 251 , "tosl" : 252 , "ucd" : 253 , "vim" : 254 , "vnlsl" : 255 , "vostrom" : 256 , "vsl" : 257 , "w3c" : 258 , "webmin" : 259 , "wsuipa" : 260 , "wxwidgets" : 261 , "xinetd" : 262 , "xerox" : 263 , "xpp" : 264 , "xskat" : 265 , "yplv1.1" : 266 , "zed" : 270 , "zend" : 271 , "zplv1.0" : 275 , "zplv2.0" : 276 , "zplv2.1" : 277 , "zlib" : 280 , "zlib with acknowledgement" : 281 , "cdl" : 400 , "cc-by" : 401 , "cc-by-sa" : 402 , "cc-by-nd" : 403 , "dsl" : 405 , "dmtf" : 406 , "oal" : 407 , "fbsddl" : 408 , "gfdl" : 409 , "ieee" : 410 , "ldpl" : 411 , "ofsfdl" : 412 , "open publication" : 413 , "public use" : 414 , "efml" : 415 , "free art" : 416 , "geotratis" : 417 , "green openmusic" : 418 , "ofl" : 419 , "utopia" : 420 , "ams" : 421 , "arphic" : 422 , "baekmuk" : 423 , "bitstream vera" : 424 , "charter" : 425 , "doublestroke" : 426 , "ec" : 427 , "elvish" : 428 , "lppl" : 429 , "hershey" : 430 , "ipa" : 431 , "liberation" : 432 , "lucida" : 433 , "mgopen" : 434 , "mplus" : 435 , "ptfl" : 436 , "punknova" : 437 , "stix" : 438 , "wadalab" : 439 , "xano" : 440 }[lic.lower()] except KeyError: logger = env.get_logger() logger.log_error("failed to parse license '{}'".format(lic)) res = 1 return pywbem.Uint16(res) @staticmethod def parse_group(group, env): try: res = \ { "amusements/games" : 3 , "amusements/graphics" : 4 , "applications/archiving" : 5 , "applications/communications" : 6 , "applications/databases" : 7 , "applications/editors" : 8 , "applications/emulators" : 9 , "applications/engineering" : 10 , "applications/file" : 11 , "applications/internet" : 12 , "applications/multimedia" : 13 , "applications/productivity" : 14 , "applications/publishing" : 15 , "applications/system" : 16 , "applications/text" : 17 , "development/build tools" : 18 , "development/debug" : 19 , "development/debuggers" : 20 , "development/documentation" : 21 , "development/java" : 22 , "development/languages" : 23 , "development/libraries" : 24 , "development/libraries/java" : 25 , "development/system" : 26 , "development/tools" : 27 , "documentation" : 28 , "internet/www/dynamic content" : 29 , "system/libraries" : 30 , "system environment/base" : 31 , "system environment/daemons" : 32 , "system environment/kernel" : 33 , "system environment/libraries" : 34 , "system environment/shells" : 35 , "text processing/markup/xml" : 36 , "user interface/desktops" : 37 , "user interface/x" : 38 , "user interface/x hardware support" : 39 , "utilities" : 40 }[group.lower()] except KeyError: logger = env.get_logger() if not group or group.lower() == "unspecified": logger.log_info("unspecified group '{}'".format(group)) res = 2 else: logger.log_error("failed to parse group '{}'".format(group)) res = 0 return pywbem.Uint16(res) @staticmethod def object_path2pkg(env, op, package_list='installed'): """ @param package_list one of {'installed', 'all', 'available'} says, where to look for given package """ if not isinstance(op, pywbem.CIMInstanceName): raise TypeError("op must be an instance of CIMInstanceName") if not isinstance(package_list, basestring): raise TypeError("package_list must be a string") if not package_list in ('installed', 'all', 'available'): raise ValueError('unsupported package list "%s"'%package_list) tos = get_target_operating_system()[0] if ( not op['Name'] or not op['SoftwareElementID'] or not op['SoftwareElementID'].startswith(op['Name']) or op['SoftwareElementID'].find(op['Version']) == -1): raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong keys.") # if op['SoftwareElementState'] not in ("2", 2): # raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, # "Only \"Executable\" software element state supported") if not check_target_operating_system(op['TargetOperatingSystem']): raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong target operating system.") if not op['Name'] or not op['Version']: raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, 'Both "Name" and "Version" must be given') m = re_nevra.match(op['SoftwareElementID']) if not m: raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong SotwareElementID. Expected valid nevra" " (name-epoch:version-release.arch).") if op['Version'] != m.group('ver'): raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Version does not match version part in SoftwareElementID.") kwargs = dict((k, m.group(k)) for k in ('name', 'epoch', 'ver', 'rel', 'arch')) with YumDB.getInstance(env) as yb: if ( package_list == 'installed' and not yb.rpmdb.installed(**kwargs)): raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Package not installed") pl = yb.doPackageLists(package_list, showdups=package_list != 'installed') if package_list != 'all': pl = getattr(pl, package_list) else: # NOTE: available ∩ installed = ∅ pl = itertools.chain(pl.available, pl.installed) exact,_,_ = yum.packages.parsePackages(pl, [op['Name']]) for pkg in yum.misc.unique(exact): if pkg.evra == m.group('evra'): break else: raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "No matching package found") return pkg @staticmethod def pkg2model_wrapper(namespace, classname): """ @return a function that transforms YumAvailablePackage object to CIMInstanceName object """ tos = pywbem.Uint16(get_target_operating_system()[0]) def pkg2model(pkg, env, keys_only=True, model=None): """ @param model if None, will be filled with data, otherwise a new instance of CIMInstance or CIMObjectPath is created """ #if not isinstance(pkg, yum.rpmsack.RPMInstalledPackage): #if not isinstance(pkg, yum.packages.YumHeaderPackage): if not isinstance(pkg, yum.packages.YumAvailablePackage): raise TypeError( "pkg must be an instance of YumAvailablePackage") if model is None: model = pywbem.CIMInstanceName(classname, namespace=namespace) if not keys_only: model = pywbem.CIMInstance(classname, path=model) if isinstance(model, pywbem.CIMInstance): def _set_key(k, v): model[k] = v model.path[k] = v else: _set_key = model.__setitem__ with YumDB.getInstance(env): _set_key('Name', pkg.name) _set_key('SoftwareElementID', pkg.nevra) _set_key('SoftwareElementState', pywbem.Uint16(2 if isinstance(pkg, yum.rpmsack.RPMInstalledPackage) else 1)) _set_key('TargetOperatingSystem', tos) _set_key('Version', pkg.version) if not keys_only: model['Caption'] = pkg.summary model['Description'] = pkg.description if isinstance(pkg, yum.rpmsack.RPMInstalledPackage): model['InstallDate'] = pywbem.CIMDateTime( datetime.fromtimestamp(pkg.installtime)) if pkg.vendor: model['Manufacturer'] = pkg.vendor model['Release'] = pkg.release model['Epoch'] = pywbem.Uint16(pkg.epoch) model["Architecture"] = YumPackage.parse_arch( pkg.arch, env) model['License'] = YumPackage.parse_license( pkg.license, env) model['LicenseString']= pkg.license model['Group'] = YumPackage.parse_group( pkg.group, env) model['GroupString'] = pkg.group model['Size'] = pywbem.Uint64(pkg.size) return model return pkg2model class YumFileCheck: """ Just a namespace for functions related to FileCheck provider. """ passed_flags_descriptions = ( "Existence", "File Type", "File Size", "File Mode", "Checksum", "Device major/minor number", "Symlink Target", "User Ownership", "Group Ownership", "Modify Time") checksumtype_str2num = dict((v, k) for (k, v) in yum.constants.RPM_CHECKSUM_TYPES.items()) @staticmethod def checksumtype_num2hash(csumt): return getattr(hashlib, yum.constants.RPM_CHECKSUM_TYPES[csumt]) @staticmethod def checksumtype_str2pywbem(alg): try: res = YumFileCheck.checksumtype_str2num[alg.lower()] except KeyError: res = 0 return pywbem.Uint16(res) @staticmethod def filetype_str2pywbem(ft): try: return pywbem.Uint16( { 'file' : 1 , 'directory' : 2 , 'symlink' : 3 , 'fifo' : 4 , 'character device' : 5 , 'block device' : 6 }[ft]) except KeyError: return pywbem.Uint16(0) @staticmethod def filetype_mode2pywbem(mode): for i, name in enumerate( ('REG', 'DIR', 'LNK', 'FIFO', 'CHR', 'BLK'), 1): if getattr(stat, 'S_IS' + name)(mode): return pywbem.Uint16(i) return pywbem.Uint16(0) @staticmethod def mode2pywbem_flags(mode): """ @param mode if None, file does not exist """ if mode is None: return None flags = [] for i, c in enumerate(( stat.S_IXOTH, stat.S_IWOTH, stat.S_IROTH, stat.S_IXGRP, stat.S_IWGRP, stat.S_IRGRP, stat.S_IXUSR, stat.S_IWUSR, stat.S_IRUSR, stat.S_ISVTX, stat.S_ISGID, stat.S_ISUID)): if c & mode: flags.append(pywbem.Uint8(i)) return flags @staticmethod def hashfile(afile, hashers, blocksize=65536): """ @param hashers is a list of hash objects @return list of digest strings (in hex format) for each hash object given in the same order """ if not isinstance(hashers, (tuple, list, set, frozenset)): hashers = (hashers, ) buf = afile.read(blocksize) while len(buf) > 0: for h in hashers: h.update(buf) buf = afile.read(blocksize) return [ h.hexdigest() for h in hashers ] @staticmethod def compute_checksums(env, checksum_type, file_type, file_path): """ @param file_type is not a file, then zeroes are returned @param checksum_type selected hash algorithm to compute second checksum @return (md5sum, checksum) both checksums are computed from file_path's content first one is always md5, the second one depends on checksum_type if file does not exists, (None, None) is returned """ hashers = [hashlib.md5()] if checksum_type != YumFileCheck.checksumtype_str2num["md5"]: hashers.append(YumFileCheck.checksumtype_num2hash(checksum_type)()) if file_type != YumFileCheck.filetype_str2pywbem('file'): rslts = ['0'*len(h.hexdigest()) for h in hashers] else: try: with open(file_path, 'rb') as f: rslts = YumFileCheck.hashfile(f, hashers) except (OSError, IOError) as e: env.get_logger().log_error("could not open file \"%s\"" " for reading: %s" % (file_path, e)) return None, None return (rslts[0], rslts[1] if len(rslts) > 1 else rslts[0]*2) @staticmethod def object_path2yumcheck(env, op): """ @return instance of yum.packages._RPMVerifyPackage this object holds YumInstalledPackage under its po attribute """ if not isinstance(op, pywbem.CIMInstanceName): raise TypeError("op must be instance of CIMInstanceName, " "not \"%s\"" % op.__class__.__name__) log = env.get_logger() tos = get_target_operating_system()[0] if ( not op['Name'] or not op['SoftwareElementID'] or not op['CheckID'] or not op['CheckID'].endswith('#'+op['Name']) or op['SoftwareElementID'].find(op['Version']) == -1): raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong keys.") if op['SoftwareElementState'] not in ("2", 2): raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Only \"Executable\" software element state supported") if not check_target_operating_system(op['TargetOperatingSystem']): raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong target operating system.") if not op['Name'] or not op['Version']: raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, 'Both "Name" and "Version" must be given') m = re_nevra.match(op['SoftwareElementID']) if not m: raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong SotwareElementID. Expected valid nevra" " (name-epoch:version-release.arch).") if op['Version'] != m.group('ver'): raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Version does not match version part in SoftwareElementID.") kwargs = dict((k, m.group(k)) for k in ('name', 'epoch', 'ver', 'rel', 'arch')) with YumDB.getInstance(env) as yb: if not yb.rpmdb.installed(**kwargs): raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Package not installed") pl = yb.doPackageLists('installed') exact, matched, unmatched = yum.packages.parsePackages( pl.installed, [kwargs['name']]) exact = yum.misc.unique(exact) for pkg in exact: if pkg.evra == m.group('evra'): break else: raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "No matching package installed") fi = pkg.hdr.fiFromHeader() vpkg = yum.packages._RPMVerifyPackage( pkg, fi, pkg.yumdb_info.checksum_type, [], True) if not op['Name'] in vpkg: raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "File not found in RPM package") return vpkg """ Named tuple to store results of rpm file check as pywbem values, all results are in form: (expected, reality) where expected is value from rpm package reality is value obtained from installed file None means, that value could not be obtained except for "exists" and "md5_checksum" attributes, where "exists" is boolean and md5_checksum is a string for example: file_check.file_type == (4, 3) """ FileCheck = collections.namedtuple('FileCheck', 'exists, md5_checksum, file_type, file_size, file_mode, ' 'checksum, device, link_target, user_id, group_id, ' 'last_modification_time') @staticmethod def test_file(env, checksum_type, vpf): """ @param checksum type is a pywbem value for ChecksumType property @return instance of FileCheck """ if not isinstance(vpf, yum.packages._RPMVerifyPackageFile): raise TypeError("vpf must be an instance of _RPMVerifyPackage," " not \"%s\"" % vpf.__class__.__name__) exists = os.path.lexists(vpf.filename) md5_checksum = None expected = { "file_type" : YumFileCheck.filetype_str2pywbem(vpf.ftype), "user_id" : pywbem.Uint32(pwd.getpwnam(vpf.user).pw_uid), "group_id" : pywbem.Uint32(grp.getgrnam(vpf.group).gr_gid), "file_mode" : pywbem.Uint32(vpf.mode), "file_size" : pywbem.Uint64(vpf.size), "link_target" : vpf.readlink if vpf.readlink else None, "checksum" : vpf.digest[1], "device" : (pywbem.Uint64(vpf.dev) if vpf.ftype.endswith('device') else None), "last_modification_time" : pywbem.Uint64(vpf.mtime) } if not exists: reality = collections.defaultdict(lambda: None) else: fstat = os.lstat(vpf.filename) reality = { "file_type" : YumFileCheck.filetype_mode2pywbem(fstat.st_mode), "user_id" : pywbem.Uint32(fstat.st_uid), "group_id" : pywbem.Uint32(fstat.st_gid), "file_mode" : pywbem.Uint32(fstat.st_mode), "file_size" : pywbem.Uint64(fstat.st_size), "last_modification_time" : pywbem.Uint64(fstat.st_mtime) } reality["device"] = (pywbem.Uint64(fstat.st_dev) if reality['file_type'] == YumFileCheck.filetype_str2pywbem( "device") else None) reality["link_target"] = (os.readlink(vpf.filename) if os.path.islink(vpf.filename) else None) md5_checksum, checksum = YumFileCheck.compute_checksums( env, checksum_type, reality["file_type"], vpf.filename) reality["checksum"] = checksum kwargs = dict(exists=exists, md5_checksum=md5_checksum, **dict((k, (expected[k], reality[k])) for k in expected)) return YumFileCheck.FileCheck(**kwargs) @staticmethod def filecheck_passed(fc): if not isinstance(fc, YumFileCheck.FileCheck): raise TypeError("fc must be an instance of FileCheck") return fc.exists and all(v[0] == v[1] for v in fc if isinstance(v, tuple)) @staticmethod def filecheck_wrapper(namespace, classname): def _filecheck2model_flags(fc): flags = [] for v in fc: if isinstance(v, tuple): flags.append(fc.exists and v[0] == v[1]) elif isinstance(v, bool): flags.append(v) return flags def filecheck2model(vpkg, fn, env, keys_only=True, model=None, fc=None): if not isinstance(vpkg, yum.packages._RPMVerifyPackage): raise TypeError( "vpkg must be an instance of _RPMVerifyPackage") if not fn in vpkg: raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "File \"%s\" not found among package files" % fn) if model is None: model = pywbem.CIMInstanceName(classname, namespace=namespace) if not keys_only: model = pywbem.CIMInstance(classname, path=model) if fc is not None: if not isinstance(fc, YumFileCheck.FileCheck): raise TypeError("fc must be an instance of FileCheck") pkg = vpkg.po vpf = vpkg._files[fn] model['Name'] = vpf.filename model['SoftwareElementID'] = pkg.nevra model['SoftwareElementState'] = pywbem.Uint16(2) model['TargetOperatingSystem'] = pywbem.Uint16( get_target_operating_system()[0]) model['Version'] = pkg.version model['CheckID'] = '%s#%s' % (pkg.name, vpf.filename) if not keys_only: #model['Caption'] = '' # TODO #model['CheckMode'] = bool() # TODO #model['CRC1'] = pywbem.Uint32() # TODO #model['CRC2'] = pywbem.Uint32() # TODO #model['CreateTimeStamp'] = pywbem.CIMDateTime() # TODO #model['Description'] = '' # TODO #model['ElementName'] = '' # TODO #model['InstanceID'] = '' # TODO model['FileName'] = os.path.basename(vpf.filename) model['ChecksumType'] = csumt = \ YumFileCheck.checksumtype_str2pywbem( pkg.yumdb_info.checksum_type) if fc is None: fc = YumFileCheck.test_file(env, csumt, vpf) for mattr, fattr in ( ('FileType', 'file_type'), ('FileUserID', 'user_id'), ('FileGroupID', 'group_id'), ('FileMode', 'file_mode'), ('LastModificationTime', 'last_modification_time'), ('FileSize', 'file_size'), ('LinkTarget', 'link_target'), ('Checksum', 'checksum')): exp, rea = getattr(fc, fattr) if exp is not None: model['Expected' + mattr] = exp if rea is not None: model[mattr] = rea model['ExpectedFileModeFlags'] = \ YumFileCheck.mode2pywbem_flags(fc.file_mode[0]) if fc.exists: model['FileModeFlags'] = YumFileCheck.mode2pywbem_flags( fc.file_mode[1]) model['FileExists'] = fc.exists if fc.md5_checksum is not None: model['MD5Checksum'] = fc.md5_checksum model['PassedFlags'] = _filecheck2model_flags(fc) model['PassedFlagsDescriptions'] = list( YumFileCheck.passed_flags_descriptions) return model return filecheck2model