# Software Management Providers
#
# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
"""Common utilities for LMI_Yum* providers
"""
import collections
from datetime import datetime
import grp
import hashlib
import os
import platform
import pwd
import re
import socket
import stat
import pywbem
import yum
import cmpi_pywbem_bindings as pycimmb
import singletonmixin
re_evra = re.compile(r'^(?P\d+):(?P[^-]+)'
r'-(?P.+)\.(?P[^.]+)$')
re_nevra = re.compile(r'^(?P.+)-(?P(?P\d+):(?P[^-]+)'
r'-(?P.+)\.(?P[^.]+))$')
rpmdb_path = '/var/lib/rpm/Packages'
class YumDB(singletonmixin.Singleton):
ignoreSubsequent = True
def __init__(self, env, *args, **kwargs):
if not isinstance(env, pycimmb.ProviderEnvironment):
raise TypeError("env must be instance of"
" pycimmb.ProviderEnvironment")
self._yum = yum.YumBase(*args, **kwargs)
self._db_mtime = 0
self._lock_cnt = 0
self.env = env
env.get_logger().log_info('init called')
def is_dirty(self):
return self._db_mtime < os.stat(rpmdb_path).st_mtime
def is_locked(self):
return self._yum._lockfile is not None
def update_db(self):
self.env.get_logger().log_info('updating rpmdb')
self._db_mtime = os.stat(rpmdb_path).st_mtime
self._yum.doConfigSetup()
self._yum.doTsSetup()
self._yum.doRpmDBSetup()
def __enter__(self):
self._lock_cnt += 1
if self._lock_cnt < 2:
if not self.is_locked() and self.is_dirty():
self.update_db()
self._yum.doLock()
return self
def __exit__(self, exc_type, exc_value, traceback):
if self._lock_cnt > 0:
self._yum.closeRpmDB()
self._yum.doUnlock()
self._lock_cnt -= 1
def __getattr__(self, name):
if not self.is_locked() and self.is_dirty():
self.update_db()
return getattr(self._yum, name)
def _get_distname():
if hasattr(platform, 'linux_distribution'):
return platform.linux_distribution(
full_distribution_name=False)[0].lower()
else:
return platform.dist()[0].lower()
def get_target_operating_system():
"""
@return (val, text).
Where val is a number from ValueMap of TargetOperatingSystem property
of CIM_SoftwareElement class and text is its testual representation.
"""
system = platform.system()
if system.lower() == 'linux':
try:
val, dist = \
{ 'redhat' : (79, 'RedHat Enterprise Linux')
, 'suse' : (81, 'SUSE')
, 'mandriva' : (88, 'Mandriva')
, 'ubuntu' : (93, 'Ubuntu')
, 'debian' : (95, 'Debian')
}[_get_distname()]
except KeyError:
linrel = platform.uname()[2]
if linrel.startswith('2.4'):
val, dist = (97, 'Linux 2.4.x')
elif linrel.startswith('2.6'):
val, dist = (99, 'Linux 2.6.x')
else:
return (36, 'LINUX') # no check for x86_64
if platform.machine() == 'x86_64':
val += 1
dist += ' 64-Bit'
return (val, dist)
elif system.lower() in ('macosx', 'darwin'):
return (2, 'MACOS')
# elif system.lower() == 'windows': # no general value
else:
return (0, 'Unknown')
def check_target_operating_system(system):
"""
@return if param system matches current target operating system
"""
if isinstance(system, basestring):
system = int(system)
if not isinstance(system, (int, long)):
raise TypeError("system must be either string or integer, not {}"
.format(system.__class__.__name__))
tos = get_target_operating_system()
if system == tos: return True
if system == 36: # linux
if platform.system().lower() == "linux": return True
if ( system >= 97 and system <= 100 # linux 2.x.x
and platform.uname()[2].startswith('2.4' if system < 99 else '2.6')
# check machine
and ( bool(platform.machine().endswith('64'))
== bool(not (system % 2)))):
return True
return False
def match_pkg(pkg, **kwargs):
"""
all not Null and not empty arguments will be matched against pkg
attributes; if all of them match, return True
possible arguments:
name, epoch, version, release, arch
evra, nevra
"""
for a in ('evra', 'nevra', 'name', 'epoch', 'version', 'release', 'arch'):
v = kwargs.get(a, None)
if v and getattr(pkg, a) != v:
return False
return True
class YumPackage:
"""
Just a namespace for common function related to YumPackage provider.
"""
@staticmethod
def parse_arch(arch, env):
try:
res = \
{ "noarch" : 0
, "i386" : 4
, "i586" : 5
, "i686" : 6
, "x86_64" : 7
, "athlon" : 8
, "ia32e" : 10
, "ia64" : 11
, "powerpc" : 12
, "ppc" : 12
, "ppc64" : 13
, "sparc" : 15
, "sparcv9" : 16
, "sparc64" : 17
, "arm" : 20
, "arm6" : 21
, "arm7" : 22
, "arm8" : 23
, "arm9" : 24
, "strongarm" : 30
, "xscale" : 31
, "parisc" : 35
, "parisc11" : 36
, "mips" : 40
, "mips32" : 41
, "mips64" : 42
, "mipsnecvr4xxx" : 45
, "mipsrm7000" : 46
, "mipstx39" : 50
, "mipstx49" : 51
, "v850" : 55
, "alpha" : 60
, "alphaev6" : 61
, "alphaev7" : 62
, "alphaev8" : 63
, "alphaev9" : 64
, "sh" : 70
, "sh2" : 71
, "sh3" : 72
, "sh4" : 73
, "sh5" : 74
}[arch.lower()]
except KeyError:
logger = env.get_logger()
logger.log_error("failed to parse architecture '{}'".format(arch))
res = 2
return pywbem.Uint16(res)
@staticmethod
def parse_license(lic, env):
try:
res = \
{ "freely redistributable without restriction" : 2
, "glide" : 5
, "afl" : 6
, "adobe" : 7
, "mit" : 10
, "mit with advertising" : 11
, "agpl" : 15
, "agplv1" : 16
, "agplv3" : 17
, "agplv3+" : 18
, "agplv3 with exceptions" : 19
, "adsl" : 25
, "amdplpa" : 26
, "asl" : 30
, "asl 1.0" : 31
, "asl 1.1" : 32
, "asl 2.0" : 33
, "artistic clarified" : 35
, "artistic 2.0" : 36
, "arl" : 40
, "aal" : 41
, "bahyph" : 42
, "barr" : 43
, "beerware" : 44
, "beopen" : 45
, "bibtex" : 46
, "boost" : 47
, "borceux" : 48
, "bsd" : 50
, "bsd with advertising" : 51
, "bsd with attribution" : 52
, "bsd protection" : 53
, "ampas bsd" : 55
, "lbnl bsd" : 56
, "ricebsd" : 57
, "catosl" : 65
, "cecill" : 66
, "cecill-b" : 67
, "cecill-c" : 68
, "netscape" : 70
, "cnri" : 71
, "cddl" : 72
, "cpl" : 73
, "condor" : 75
, "copyright only" : 76
, "cpal" : 77
, "gpl" : 80
, "gpl+" : 81
, "gpl+ with exceptions" : 82
, "gplv1" : 83
, "gplv2" : 84
, "gplv2 with exceptions" : 85
, "gplv2+" : 86
, "gplv2+ with exceptions" : 87
, "gplv3" : 88
, "gplv3 with exceptions" : 89
, "gplv3+" : 90
, "gplv3+ with exceptions" : 91
, "gpl+ or artistic" : 100
, "gplv2 or artistic" : 101
, "gplv2+ or artistic" : 102
, "lgpl" : 110
, "lgplv2" : 111
, "lgplv2 with exceptions" : 112
, "lgplv2+" : 113
, "lgplv3" : 115
, "lgplv3 with exceptions" : 116
, "lgplv3+" : 117
, "cc0" : 125
, "crossword" : 126
, "crystal stacker" : 127
, "mpl" : 130
, "mplv1.0" : 131
, "mplv1.1" : 132
, "mplv2.0" : 133
, "diffmark" : 135
, "wtfpl" : 136
, "doc" : 137
, "dotseqn" : 138
, "dsdp" : 139
, "dvipdfm" : 140
, "epl" : 141
, "ecos" : 142
, "ecl 1.0" : 145
, "ecl 2.0" : 146
, "egenix" : 150
, "efl 2.0" : 151
, "entessa" : 152
, "erpl" : 153
, "eu datagrid" : 154
, "eupl 1.1" : 155
, "eurosym" : 156
, "fair" : 157
, "ftl" : 158
, "gitware" : 159
, "gl2ps" : 160
, "glulxe" : 161
, "gnuplot" : 162
, "haskellreport" : 163
, "ibm" : 164
, "imatrix" : 165
, "imagemagick" : 166
, "imlib2" : 167
, "ijg" : 168
, "intel acpi" : 169
, "interbase" : 170
, "isc" : 171
, "jabber" : 172
, "jasper" : 173
, "jpython" : 174
, "julius" : 175
, "knuth" : 176
, "lppl" : 177
, "latex2e" : 178
, "leptonica" : 179
, "lhcyr" : 180
, "libtiff" : 181
, "logica" : 182
, "makeindex" : 183
, "mecab-ipadic" : 184
, "ms-pl" : 185
, "ms-rl" : 186
, "midnight" : 187
, "miros" : 188
, "mod_macro" : 189
, "motosoto" : 190
, "naumen" : 191
, "ncsa" : 192
, "netcdf" : 193
, "ngpl" : 194
, "nosl" : 195
, "newmat" : 196
, "newsletr" : 197
, "nokia" : 198
, "nlpl" : 200
, "noweb" : 201
, "openldap" : 202
, "oml" : 203
, "openpbs" : 204
, "osl 1.0" : 205
, "osl 1.1" : 206
, "osl 2.0" : 207
, "osl 2.1" : 208
, "osl 3.0" : 209
, "openssl" : 215
, "oreilly" : 216
, "par" : 217
, "phorum" : 220
, "php" : 221
, "plaintex" : 222
, "plexus" : 223
, "postgresql" : 224
, "psfrag" : 225
, "psutils" : 226
, "public domain" : 227
, "python" : 228
, "qhull" : 229
, "qpl" : 230
, "rdisc" : 231
, "rpsl" : 232
, "romio" : 233
, "rsfs" : 235
, "ruby" : 236
, "saxpath" : 237
, "scea" : 238
, "scrip" : 239
, "sendmail" : 240
, "sleepycat" : 241
, "slib" : 242
, "snia" : 243
, "sissl" : 244
, "spl" : 245
, "tcl" : 246
, "teeworlds" : 247
, "tpl" : 248
, "threeparttable" : 249
, "tmate" : 250
, "torquev1.1" : 251
, "tosl" : 252
, "ucd" : 253
, "vim" : 254
, "vnlsl" : 255
, "vostrom" : 256
, "vsl" : 257
, "w3c" : 258
, "webmin" : 259
, "wsuipa" : 260
, "wxwidgets" : 261
, "xinetd" : 262
, "xerox" : 263
, "xpp" : 264
, "xskat" : 265
, "yplv1.1" : 266
, "zed" : 270
, "zend" : 271
, "zplv1.0" : 275
, "zplv2.0" : 276
, "zplv2.1" : 277
, "zlib" : 280
, "zlib with acknowledgement" : 281
, "cdl" : 400
, "cc-by" : 401
, "cc-by-sa" : 402
, "cc-by-nd" : 403
, "dsl" : 405
, "dmtf" : 406
, "oal" : 407
, "fbsddl" : 408
, "gfdl" : 409
, "ieee" : 410
, "ldpl" : 411
, "ofsfdl" : 412
, "open publication" : 413
, "public use" : 414
, "efml" : 415
, "free art" : 416
, "geotratis" : 417
, "green openmusic" : 418
, "ofl" : 419
, "utopia" : 420
, "ams" : 421
, "arphic" : 422
, "baekmuk" : 423
, "bitstream vera" : 424
, "charter" : 425
, "doublestroke" : 426
, "ec" : 427
, "elvish" : 428
, "lppl" : 429
, "hershey" : 430
, "ipa" : 431
, "liberation" : 432
, "lucida" : 433
, "mgopen" : 434
, "mplus" : 435
, "ptfl" : 436
, "punknova" : 437
, "stix" : 438
, "wadalab" : 439
, "xano" : 440
}[lic.lower()]
except KeyError:
logger = env.get_logger()
logger.log_error("failed to parse license '{}'".format(lic))
res = 1
return pywbem.Uint16(res)
@staticmethod
def parse_group(group, env):
try:
res = \
{ "amusements/games" : 3
, "amusements/graphics" : 4
, "applications/archiving" : 5
, "applications/communications" : 6
, "applications/databases" : 7
, "applications/editors" : 8
, "applications/emulators" : 9
, "applications/engineering" : 10
, "applications/file" : 11
, "applications/internet" : 12
, "applications/multimedia" : 13
, "applications/productivity" : 14
, "applications/publishing" : 15
, "applications/system" : 16
, "applications/text" : 17
, "development/build tools" : 18
, "development/debug" : 19
, "development/debuggers" : 20
, "development/documentation" : 21
, "development/java" : 22
, "development/languages" : 23
, "development/libraries" : 24
, "development/libraries/java" : 25
, "development/system" : 26
, "development/tools" : 27
, "documentation" : 28
, "internet/www/dynamic content" : 29
, "system/libraries" : 30
, "system environment/base" : 31
, "system environment/daemons" : 32
, "system environment/kernel" : 33
, "system environment/libraries" : 34
, "system environment/shells" : 35
, "text processing/markup/xml" : 36
, "user interface/desktops" : 37
, "user interface/x" : 38
, "user interface/x hardware support" : 39
, "utilities" : 40
}[group.lower()]
except KeyError:
logger = env.get_logger()
if not group or group.lower() == "unspecified":
logger.log_info("unspecified group '{}'".format(group))
res = 2
else:
logger.log_error("failed to parse group '{}'".format(group))
res = 0
return pywbem.Uint16(res)
@staticmethod
def object_path2pkg(env, op):
if not isinstance(op, pywbem.CIMInstanceName):
raise TypeError("op must be an instance of CIMInstanceName")
tos = get_target_operating_system()[0]
if ( not op['Name'] or not op['SoftwareElementID']
or not op['SoftwareElementID'].startswith(op['Name'])
or op['SoftwareElementID'].find(op['Version']) == -1):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong keys.")
if op['SoftwareElementState'] not in ("2", 2):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Only \"Executable\" software element state supported")
if not check_target_operating_system(op['TargetOperatingSystem']):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Wrong target operating system.")
if not op['Name'] or not op['Version']:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
'Both "Name" and "Version" must be given')
m = re_nevra.match(op['SoftwareElementID'])
if not m:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Wrong SotwareElementID. Expected valid nevra"
" (name-epoch:version-release.arch).")
if op['Version'] != m.group('ver'):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Version does not match version part in SoftwareElementID.")
kwargs = dict((k, m.group(k)) for k in
('name', 'epoch', 'ver', 'rel', 'arch'))
with YumDB.getInstance(env) as yb:
if not yb.rpmdb.installed(**kwargs):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Package not installed")
pl = yb.doPackageLists('installed')
exact, matched, unmatched = yum.packages.parsePackages(
pl.installed, [op['Name']])
exact = yum.misc.unique(exact)
for pkg in exact:
if pkg.evra == m.group('evra'): break
else:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"No matching package installed")
return pkg
@staticmethod
def pkg2model_wrapper(namespace, classname):
"""
@return a function that transforms YumAvailablePackage object
to CIMInstanceName object
"""
tos = pywbem.Uint16(get_target_operating_system()[0])
def pkg2model(pkg, env, keys_only=True, model=None):
"""
@param model if None, will be filled with data, otherwise
a new instance of CIMInstance or CIMObjectPath is created
"""
#if not isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
#if not isinstance(pkg, yum.packages.YumHeaderPackage):
if not isinstance(pkg, yum.packages.YumAvailablePackage):
raise TypeError(
"pkg must be an instance of YumAvailablePackage")
if model is None:
model = pywbem.CIMInstanceName(classname, namespace=namespace)
if not keys_only:
model = pywbem.CIMInstance(classname, path=model)
model['Name'] = pkg.name
model['SoftwareElementID'] = pkg.nevra
model['SoftwareElementState'] = pywbem.Uint16(2)
model['TargetOperatingSystem'] = tos
model['Version'] = pkg.version
if not keys_only:
model['Caption'] = pkg.summary
model['Description'] = pkg.description
model['InstallDate'] = pywbem.CIMDateTime(
datetime.fromtimestamp(pkg.installtime))
if pkg.vendor:
model['Manufacturer'] = pkg.vendor
model['Release'] = pkg.release
model['Epoch'] = pywbem.Uint16(pkg.epoch)
model["Architecture"] = YumPackage.parse_arch(pkg.arch, env)
model['License'] = YumPackage.parse_license(pkg.license, env)
model['LicenseString']= pkg.license
model['Group'] = YumPackage.parse_group(pkg.group, env)
model['GroupString'] = pkg.group
model['Size'] = pywbem.Uint64(pkg.size)
return model
return pkg2model
class YumFileCheck:
"""
Just a namespace for functions related to FileCheck provider.
"""
passed_flags_descriptions = (
"Existence",
"File Type",
"File Size",
"File Mode",
"Checksum",
"Device major/minor number",
"Symlink Target",
"User Ownership", "Group Ownership",
"Modify Time")
checksumtype_str2num = dict((v, k) for (k, v) in
yum.constants.RPM_CHECKSUM_TYPES.items())
@staticmethod
def checksumtype_num2hash(csumt):
return getattr(hashlib, yum.constants.RPM_CHECKSUM_TYPES[csumt])
@staticmethod
def checksumtype_str2pywbem(alg):
try:
res = YumFileCheck.checksumtype_str2num[alg.lower()]
except KeyError: res = 0
return pywbem.Uint16(res)
@staticmethod
def filetype_str2pywbem(ft):
try:
return pywbem.Uint16(
{ 'file' : 1
, 'directory' : 2
, 'symlink' : 3
, 'fifo' : 4
, 'character device' : 5
, 'block device' : 6
}[ft])
except KeyError:
return pywbem.Uint16(0)
@staticmethod
def filetype_mode2pywbem(mode):
for i, name in enumerate(
('REG', 'DIR', 'LNK', 'FIFO', 'CHR', 'BLK'), 1):
if getattr(stat, 'S_IS' + name)(mode):
return pywbem.Uint16(i)
return pywbem.Uint16(0)
@staticmethod
def mode2pywbem_flags(mode):
"""
@param mode if None, file does not exist
"""
if mode is None: return None
flags = []
for i, c in enumerate((
stat.S_IXOTH,
stat.S_IWOTH,
stat.S_IROTH,
stat.S_IXGRP,
stat.S_IWGRP,
stat.S_IRGRP,
stat.S_IXUSR,
stat.S_IWUSR,
stat.S_IRUSR,
stat.S_ISVTX,
stat.S_ISGID,
stat.S_ISUID)):
if c & mode:
flags.append(pywbem.Uint8(i))
return flags
@staticmethod
def hashfile(afile, hashers, blocksize=65536):
"""
@param hashers is a list of hash objects
@return list of digest strings (in hex format) for each hash object
given in the same order
"""
if not isinstance(hashers, (tuple, list, set, frozenset)):
hashers = (hashers, )
buf = afile.read(blocksize)
while len(buf) > 0:
for h in hashers: h.update(buf)
buf = afile.read(blocksize)
return [ h.hexdigest() for h in hashers ]
@staticmethod
def compute_checksums(env, checksum_type, file_type, file_path):
"""
@param file_type is not a file, then zeroes are returned
@param checksum_type selected hash algorithm to compute second
checksum
@return (md5sum, checksum)
both checksums are computed from file_path's content
first one is always md5, the second one depends on checksum_type
if file does not exists, (None, None) is returned
"""
hashers = [hashlib.md5()]
if checksum_type != YumFileCheck.checksumtype_str2num["md5"]:
hashers.append(YumFileCheck.checksumtype_num2hash(checksum_type)())
if file_type != YumFileCheck.filetype_str2pywbem('file'):
rslts = ['0'*len(h.hexdigest()) for h in hashers]
else:
try:
with open(file_path, 'rb') as f:
rslts = YumFileCheck.hashfile(f, hashers)
except (OSError, IOError) as e:
env.get_logger().log_error("could not open file \"%s\""
" for reading: %s" % (file_path, e))
return None, None
return (rslts[0], rslts[1] if len(rslts) > 1 else rslts[0]*2)
@staticmethod
def object_path2yumcheck(env, op):
"""
@return instance of yum.packages._RPMVerifyPackage
this object holds YumInstalledPackage under its po attribute
"""
if not isinstance(op, pywbem.CIMInstanceName):
raise TypeError("op must be instance of CIMInstanceName, "
"not \"%s\"" % op.__class__.__name__)
log = env.get_logger()
tos = get_target_operating_system()[0]
if ( not op['Name'] or not op['SoftwareElementID']
or not op['CheckID']
or not op['CheckID'].endswith('#'+op['Name'])
or op['SoftwareElementID'].find(op['Version']) == -1):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong keys.")
if op['SoftwareElementState'] not in ("2", 2):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Only \"Executable\" software element state supported")
if not check_target_operating_system(op['TargetOperatingSystem']):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Wrong target operating system.")
if not op['Name'] or not op['Version']:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
'Both "Name" and "Version" must be given')
m = re_nevra.match(op['SoftwareElementID'])
if not m:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Wrong SotwareElementID. Expected valid nevra"
" (name-epoch:version-release.arch).")
if op['Version'] != m.group('ver'):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Version does not match version part in SoftwareElementID.")
kwargs = dict((k, m.group(k)) for k in
('name', 'epoch', 'ver', 'rel', 'arch'))
with YumDB.getInstance(env) as yb:
if not yb.rpmdb.installed(**kwargs):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"Package not installed")
pl = yb.doPackageLists('installed')
exact, matched, unmatched = yum.packages.parsePackages(
pl.installed, [kwargs['name']])
exact = yum.misc.unique(exact)
for pkg in exact:
if pkg.evra == m.group('evra'): break
else:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"No matching package installed")
fi = pkg.hdr.fiFromHeader()
vpkg = yum.packages._RPMVerifyPackage(
pkg, fi, pkg.yumdb_info.checksum_type, [], True)
if not op['Name'] in vpkg:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"File not found in RPM package")
return vpkg
"""
Named tuple to store results of rpm file check as pywbem values,
all results are in form:
(expected, reality)
where
expected is value from rpm package
reality is value obtained from installed file
None means, that value could not be obtained
except for "exists" and "md5_checksum" attributes, where "exists"
is boolean and md5_checksum is a string
for example:
file_check.file_type == (4, 3)
"""
FileCheck = collections.namedtuple('FileCheck',
'exists, md5_checksum, file_type, file_size, file_mode, '
'checksum, device, link_target, user_id, group_id, '
'last_modification_time')
@staticmethod
def test_file(env, checksum_type, vpf):
"""
@param checksum type is a pywbem value for ChecksumType property
@return instance of FileCheck
"""
if not isinstance(vpf, yum.packages._RPMVerifyPackageFile):
raise TypeError("vpf must be an instance of _RPMVerifyPackage,"
" not \"%s\"" % vpf.__class__.__name__)
exists = os.path.lexists(vpf.filename)
md5_checksum = None
expected = {
"file_type" : YumFileCheck.filetype_str2pywbem(vpf.ftype),
"user_id" : pywbem.Uint32(pwd.getpwnam(vpf.user).pw_uid),
"group_id" : pywbem.Uint32(grp.getgrnam(vpf.group).gr_gid),
"file_mode" : pywbem.Uint32(vpf.mode),
"file_size" : pywbem.Uint64(vpf.size),
"link_target" : vpf.readlink if vpf.readlink else None,
"checksum" : vpf.digest[1],
"device" : (pywbem.Uint64(vpf.dev)
if vpf.ftype.endswith('device') else None),
"last_modification_time" : pywbem.Uint64(vpf.mtime)
}
if not exists:
reality = collections.defaultdict(lambda: None)
else:
fstat = os.lstat(vpf.filename)
reality = {
"file_type" : YumFileCheck.filetype_mode2pywbem(fstat.st_mode),
"user_id" : pywbem.Uint32(fstat.st_uid),
"group_id" : pywbem.Uint32(fstat.st_gid),
"file_mode" : pywbem.Uint32(fstat.st_mode),
"file_size" : pywbem.Uint64(fstat.st_size),
"last_modification_time" : pywbem.Uint64(fstat.st_mtime)
}
reality["device"] = (pywbem.Uint64(fstat.st_dev)
if reality['file_type'] == YumFileCheck.filetype_str2pywbem(
"device") else None)
reality["link_target"] = (os.readlink(vpf.filename)
if os.path.islink(vpf.filename) else None)
md5_checksum, checksum = YumFileCheck.compute_checksums(
env, checksum_type, reality["file_type"], vpf.filename)
reality["checksum"] = checksum
kwargs = dict(exists=exists, md5_checksum=md5_checksum,
**dict((k, (expected[k], reality[k])) for k in expected))
return YumFileCheck.FileCheck(**kwargs)
@staticmethod
def filecheck_passed(fc):
if not isinstance(fc, YumFileCheck.FileCheck):
raise TypeError("fc must be an instance of FileCheck")
return fc.exists and all(v[0] == v[1]
for v in fc if isinstance(v, tuple))
@staticmethod
def filecheck_wrapper(namespace, classname):
def _filecheck2model_flags(fc):
flags = []
for v in fc:
if isinstance(v, tuple):
flags.append(fc.exists and v[0] == v[1])
elif isinstance(v, bool):
flags.append(v)
return flags
def filecheck2model(vpkg, fn, env, keys_only=True, model=None):
if not isinstance(vpkg, yum.packages._RPMVerifyPackage):
raise TypeError(
"vpkg must be an instance of _RPMVerifyPackage")
if not fn in vpkg:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"File \"%s\" not found among package files" % fn)
if model is None:
model = pywbem.CIMInstanceName(classname, namespace=namespace)
if not keys_only:
model = pywbem.CIMInstance(classname, path=model)
pkg = vpkg.po
vpf = vpkg._files[fn]
model['Name'] = vpf.filename
model['SoftwareElementID'] = pkg.nevra
model['SoftwareElementState'] = pywbem.Uint16(2)
model['TargetOperatingSystem'] = pywbem.Uint16(
get_target_operating_system()[0])
model['Version'] = pkg.version
model['CheckID'] = '%s#%s' % (pkg.name, vpf.filename)
if not keys_only:
#model['Caption'] = '' # TODO
#model['CheckMode'] = bool() # TODO
#model['CRC1'] = pywbem.Uint32() # TODO
#model['CRC2'] = pywbem.Uint32() # TODO
#model['CreateTimeStamp'] = pywbem.CIMDateTime() # TODO
#model['Description'] = '' # TODO
#model['ElementName'] = '' # TODO
#model['InstanceID'] = '' # TODO
model['FileName'] = os.path.basename(vpf.filename)
model['ChecksumType'] = csumt = \
YumFileCheck.checksumtype_str2pywbem(
pkg.yumdb_info.checksum_type)
fc = YumFileCheck.test_file(env, csumt, vpf)
for mattr, fattr in (
('FileType', 'file_type'),
('FileUserID', 'user_id'),
('FileGroupID', 'group_id'),
('FileMode', 'file_mode'),
('LastModificationTime', 'last_modification_time'),
('FileSize', 'file_size'),
('LinkTarget', 'link_target'),
('Checksum', 'checksum')):
exp, rea = getattr(fc, fattr)
if exp is not None:
model['Expected' + mattr] = exp
if rea is not None:
model[mattr] = rea
model['ExpectedFileModeFlags'] = \
YumFileCheck.mode2pywbem_flags(fc.file_mode[0])
if fc.exists:
model['FileModeFlags'] = YumFileCheck.mode2pywbem_flags(
fc.file_mode[1])
model['FileExists'] = fc.exists
if fc.md5_checksum is not None:
model['MD5Checksum'] = fc.md5_checksum
model['PassedFlags'] = _filecheck2model_flags(fc)
model['PassedFlagsDescriptions'] = list(
YumFileCheck.passed_flags_descriptions)
return model
return filecheck2model