summaryrefslogtreecommitdiffstats
path: root/src/software/providers/util/common.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/software/providers/util/common.py')
-rw-r--r--src/software/providers/util/common.py804
1 files changed, 804 insertions, 0 deletions
diff --git a/src/software/providers/util/common.py b/src/software/providers/util/common.py
new file mode 100644
index 0000000..4190d41
--- /dev/null
+++ b/src/software/providers/util/common.py
@@ -0,0 +1,804 @@
+# -*- encoding: utf-8 -*-
+# Software Management Providers
+#
+# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""Common utilities for LMI_Software* providers
+"""
+
+import collections
+from datetime import datetime
+import grp
+import hashlib
+import itertools
+import os
+import platform
+import pwd
+import re
+import rpm
+import socket
+import stat
+import pywbem
+import yum
+import cmpi_pywbem_bindings as pycimmb
+import singletonmixin
+
+re_evra = re.compile(r'^(?P<epoch>\d+):(?P<ver>[^-]+)'
+ r'-(?P<rel>.+)\.(?P<arch>[^.]+)$')
+re_nevra = re.compile(r'^(?P<name>.+)-(?P<evra>((?P<epoch>\d+):)?(?P<ver>[^-]+)'
+ r'-(?P<rel>.+)\.(?P<arch>[^.]+))$')
+
+rpmdb_path = '/var/lib/rpm/Packages'
+
+class YumDB(singletonmixin.Singleton):
+
+ ignoreSubsequent = True
+
+ def __init__(self, env, *args, **kwargs):
+ if not isinstance(env, pycimmb.ProviderEnvironment):
+ raise TypeError("env must be instance of"
+ " pycimmb.ProviderEnvironment")
+ self._yum_args = (args, kwargs)
+ self._yum = None
+ self._db_mtime = 0
+ self._lock_cnt = 0
+ self.env = env
+ env.get_logger().log_info('init called')
+
+ def is_dirty(self):
+ return self._db_mtime < os.stat(rpmdb_path).st_mtime
+
+ def is_locked(self):
+ return self._yum._lockfile is not None
+
+ def update_db(self):
+ self.env.get_logger().log_info('updating rpmdb')
+ self._db_mtime = os.stat(rpmdb_path).st_mtime
+ self._yum.doConfigSetup()
+ self._yum.doTsSetup()
+ self._yum.doRpmDBSetup()
+
+ def __enter__(self):
+ self._lock_cnt += 1
+ if self._lock_cnt < 2:
+ self._yum = yum.YumBase(*self._yum_args[0], **self._yum_args[1])
+ if not self.is_locked() and self.is_dirty():
+ self.update_db()
+ self._yum.doLock()
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ if self._lock_cnt == 1:
+ del self._yum
+ self._lock_cnt -= 1
+
+ def __getattr__(self, name):
+ if not self.is_locked() and self.is_dirty():
+ self.update_db()
+ return getattr(self._yum, name)
+
+
+def _get_distname():
+ if hasattr(platform, 'linux_distribution'):
+ return platform.linux_distribution(
+ full_distribution_name=False)[0].lower()
+ else:
+ return platform.dist()[0].lower()
+
+
+def get_target_operating_system():
+ """
+ @return (val, text).
+ Where val is a number from ValueMap of TargetOperatingSystem property
+ of CIM_SoftwareElement class and text is its testual representation.
+ """
+
+ system = platform.system()
+ if system.lower() == 'linux':
+ try:
+ val, dist = \
+ { 'redhat' : (79, 'RedHat Enterprise Linux')
+ , 'suse' : (81, 'SUSE')
+ , 'mandriva' : (88, 'Mandriva')
+ , 'ubuntu' : (93, 'Ubuntu')
+ , 'debian' : (95, 'Debian')
+ }[_get_distname()]
+ except KeyError:
+ linrel = platform.uname()[2]
+ if linrel.startswith('2.4'):
+ val, dist = (97, 'Linux 2.4.x')
+ elif linrel.startswith('2.6'):
+ val, dist = (99, 'Linux 2.6.x')
+ else:
+ return (36, 'LINUX') # no check for x86_64
+ if platform.machine() == 'x86_64':
+ val += 1
+ dist += ' 64-Bit'
+ return (val, dist)
+ elif system.lower() in ('macosx', 'darwin'):
+ return (2, 'MACOS')
+ # elif system.lower() == 'windows': # no general value
+ else:
+ return (0, 'Unknown')
+
+def get_computer_system_op():
+ return pywbem.CIMInstanceName(
+ classname='CIM_ComputerSystem',
+ keybindings={
+ "CreationClassName": "CIM_ComputerSystem"
+ , "Name" : socket.gethostname() },
+ namespace="root/cimv2")
+
+def check_target_operating_system(system):
+ """
+ @return if param system matches current target operating system
+ """
+ if isinstance(system, basestring):
+ system = int(system)
+ if not isinstance(system, (int, long)):
+ raise TypeError("system must be either string or integer, not {}"
+ .format(system.__class__.__name__))
+ tos = get_target_operating_system()
+ if system == tos: return True
+ if system == 36: # linux
+ if platform.system().lower() == "linux": return True
+ if ( system >= 97 and system <= 100 # linux 2.x.x
+ and platform.uname()[2].startswith('2.4' if system < 99 else '2.6')
+ # check machine
+ and ( bool(platform.machine().endswith('64'))
+ == bool(not (system % 2)))):
+ return True
+ return False
+
+def check_computer_system_op(env, system):
+ if not isinstance(system, pywbem.CIMInstanceName):
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "\"System\" must be a CIMInstanceName")
+ our_system = get_computer_system_op()
+ ch = env.get_cimom_handle()
+ if not ch.is_subclass(system.namespace,
+ sub=system.classname,
+ super=our_system.classname):
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "Class of \"System\" must be a sublass of %s" %
+ our_system.classname)
+ if not 'CreationClassName' in system or not 'Name' in system:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "\"System\" is missing one of keys")
+ if not ch.is_subclass(system.namespace,
+ sub=system['CreationClassName'],
+ super=our_system.classname):
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "CreationClassName of \"System\" must be a sublass of %s" %
+ our_system.classname)
+ if system['Name'] != our_system['Name']:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "Name of \"System\" does not match \"%s\"" %
+ our_system['Name'])
+ return True
+
+
+def match_pkg(pkg, **kwargs):
+ """
+ all not Null and not empty arguments will be matched against pkg
+ attributes; if all of them match, return True
+
+ possible arguments:
+ name, epoch, version, release, arch
+ evra, nevra
+ """
+ for a in ('evra', 'nevra', 'name', 'epoch', 'version', 'release', 'arch'):
+ v = kwargs.get(a, None)
+ if v and getattr(pkg, a) != v:
+ return False
+ return True
+
+class SoftwarePackage:
+ """
+ Just a namespace for common function related to SoftwarePackage provider.
+ """
+
+ @staticmethod
+ def parse_group(group, env):
+ try:
+ res = \
+ { "amusements/games" : 3
+ , "amusements/graphics" : 4
+ , "applications/archiving" : 5
+ , "applications/communications" : 6
+ , "applications/databases" : 7
+ , "applications/editors" : 8
+ , "applications/emulators" : 9
+ , "applications/engineering" : 10
+ , "applications/file" : 11
+ , "applications/internet" : 12
+ , "applications/multimedia" : 13
+ , "applications/productivity" : 14
+ , "applications/publishing" : 15
+ , "applications/system" : 16
+ , "applications/text" : 17
+ , "development/build tools" : 18
+ , "development/debug" : 19
+ , "development/debuggers" : 20
+ , "development/documentation" : 21
+ , "development/java" : 22
+ , "development/languages" : 23
+ , "development/libraries" : 24
+ , "development/libraries/java" : 25
+ , "development/system" : 26
+ , "development/tools" : 27
+ , "documentation" : 28
+ , "internet/www/dynamic content" : 29
+ , "system/libraries" : 30
+ , "system environment/base" : 31
+ , "system environment/daemons" : 32
+ , "system environment/kernel" : 33
+ , "system environment/libraries" : 34
+ , "system environment/shells" : 35
+ , "text processing/markup/xml" : 36
+ , "user interface/desktops" : 37
+ , "user interface/x" : 38
+ , "user interface/x hardware support" : 39
+ , "utilities" : 40
+ }[group.lower()]
+ except KeyError:
+ logger = env.get_logger()
+ if not group or group.lower() == "unspecified":
+ logger.log_info("unspecified group '{}'".format(group))
+ res = 2
+ else:
+ logger.log_error("failed to parse group '{}'".format(group))
+ res = 0
+ return pywbem.Uint16(res)
+
+ @staticmethod
+ def object_path2pkg(env, op, package_list='installed'):
+ """
+ @param op must contain precise information of package,
+ otherwise a CIM_ERR_NOT_FOUND error is raised
+ @param package_list one of {'installed', 'all', 'available'}
+ says, where to look for given package
+ """
+ if not isinstance(op, pywbem.CIMInstanceName):
+ raise TypeError("op must be an instance of CIMInstanceName")
+ if not isinstance(package_list, basestring):
+ raise TypeError("package_list must be a string")
+ if not package_list in ('installed', 'all', 'available'):
+ raise ValueError('unsupported package list "%s"'%package_list)
+
+ tos = get_target_operating_system()[0]
+ if ( not op['Name'] or not op['SoftwareElementID']
+ or not op['SoftwareElementID'].startswith(op['Name'])
+ or op['SoftwareElementID'].find(op['Version']) == -1):
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong keys.")
+# if op['SoftwareElementState'] not in ("2", 2):
+# raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+# "Only \"Executable\" software element state supported")
+ if not check_target_operating_system(op['TargetOperatingSystem']):
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "Wrong target operating system.")
+ if not op['Name'] or not op['Version']:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ 'Both "Name" and "Version" must be given')
+ m = re_nevra.match(op['SoftwareElementID'])
+ if not m:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "Wrong SotwareElementID. Expected valid nevra"
+ " (name-[epoch:]version-release.arch).")
+ if op['Version'] != m.group('ver'):
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "Version does not match version part in SoftwareElementID.")
+ evra = "{}:{}-{}.{}".format(*(
+ (m.group(k) if k != "epoch" or m.group(k) else "0")
+ for k in ("epoch", 'ver', 'rel', 'arch')))
+ with YumDB.getInstance(env) as yb:
+ pl = yb.doPackageLists(package_list,
+ showdups=package_list != 'installed')
+ if package_list != 'all':
+ pl = getattr(pl, package_list)
+ else:
+ # NOTE: available ∩ installed = ∅
+ pl = itertools.chain(pl.available, pl.installed)
+ exact,_,_ = yum.packages.parsePackages(pl, [op['Name']])
+ for pkg in yum.misc.unique(exact):
+ if pkg.evra == evra: break
+ else:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "No matching package found.")
+ return pkg
+
+ @staticmethod
+ def object_path2pkg_search(env, op):
+ """
+ similar to object_path2pkg, but tries to find best suitable
+ package matching keys
+
+ If any matching package is already installed, it is returned.
+ Otherwise available package with highest version is returned.
+
+ @param op may be object of CIMInstance or CIMInstanceName and
+ must contain at least \"Name\" or \"SoftwareElementID\"
+ @return instance of yum.rpmsack.RPMInstalledPackage in case of
+ installed package, otherwise yum.packages.YumAvailablePackage
+ """
+ logger = env.get_logger()
+ if isinstance(op, pywbem.CIMInstance):
+ def _get_key(k):
+ v = op.properties.get(k, None)
+ if isinstance(v, pywbem.CIMProperty): return v.value
+ if v is not None: return v
+ logger.log_error('missing key "{}" in inst.props'.format(k))
+ return op.path[k] if k in op.path else None
+ elif isinstance(op, pywbem.CIMInstanceName):
+ _get_key = lambda k: op[k] if k in op else None
+ else:
+ raise TypeError("op must be either CIMInstance or CIMInstanceName")
+
+ # parse and check arguments
+ match_props = {} # args for match_pkg
+ if _get_key('SoftwareElementID'):
+ m = re_nevra.match(_get_key('SoftwareElementID'))
+ if not m:
+ raise pywbem.CIMError(pywbem.CIM_ERR_INVALID_PARAMETER,
+ "SoftwareElementID could not be parsed.")
+ match_props['nevra'] = _get_key('SoftwareElementID')
+ match_props['name'] = m.group('name')
+ else:
+ for matchattr, instattr in (
+ ('name', 'name'), ('epoch', 'epoch'), ('version', 'ver'),
+ ('release', 'rel'), ('arch', 'arch')):
+ if _get_key(matchattr):
+ match_props[matchattr] = _get_key(matchattr)
+
+ if not match_props:
+ raise pywbem.CIMError(pywbem.CIM_ERR_FAILED,
+ "Too few key values given (give at least a Name).")
+ if not 'name' in match_props and not 'nevra' in match_props:
+ raise pywbem.CIMError(pywbem.CIM_ERR_FAILED,
+ "Missing either Name or SoftwareElementID property.")
+
+ # get available packages
+ pl = YumDB.getInstance(env).doPackageLists('all', showdups=True)
+ # NOTE: available ∩ installed = ∅
+ exact,_,_ = yum.packages.parsePackages(
+ itertools.chain(pl.available, pl.installed),
+ [match_props['name']])
+ exact = yum.misc.unique(exact)
+ exact_orig = exact
+ exact = sorted( [ p for p in exact if match_pkg(p, **match_props) ]
+ , key=lambda a: a.evra)
+ if len(exact) == 0:
+ logger.log_error('could not find any package for query: {}'
+ ' in list: {}'
+ .format(match_props, [p.nevra for p in exact_orig]))
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "No matching package found.")
+ for pkg in exact: # check, whether package is already installed
+ if isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
+ return pkg
+ logger.log_info(('found multiple matching packages'
+ ' for query: {}' if len(exact) > 1 else
+ 'exact match found for query: {}')
+ .format(match_props))
+ return exact[-1] # select highest version
+
+ @staticmethod
+ def pkg2model_wrapper(namespace, classname):
+ """
+ @return a function that transforms YumAvailablePackage object
+ to CIMInstanceName object
+ """
+
+ tos = pywbem.Uint16(get_target_operating_system()[0])
+
+ def pkg2model(env, pkg, keys_only=True, model=None):
+ """
+ @param model if None, will be filled with data, otherwise
+ a new instance of CIMInstance or CIMObjectPath is created
+ """
+ #if not isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
+ #if not isinstance(pkg, yum.packages.YumHeaderPackage):
+ if not isinstance(pkg, yum.packages.YumAvailablePackage):
+ raise TypeError(
+ "pkg must be an instance of YumAvailablePackage")
+ if model is None:
+ model = pywbem.CIMInstanceName(classname, namespace=namespace)
+ if not keys_only:
+ model = pywbem.CIMInstance(classname, path=model)
+ if isinstance(model, pywbem.CIMInstance):
+ def _set_key(k, v):
+ model[k] = v
+ model.path[k] = v
+ else:
+ _set_key = model.__setitem__
+ with YumDB.getInstance(env):
+ _set_key('Name', pkg.name)
+ _set_key('SoftwareElementID', pkg.nevra)
+ _set_key('SoftwareElementState', pywbem.Uint16(2
+ if isinstance(pkg, yum.rpmsack.RPMInstalledPackage)
+ else 1))
+ _set_key('TargetOperatingSystem', tos)
+ _set_key('Version', pkg.version)
+ if not keys_only:
+ model['Caption'] = pkg.summary
+ model['Description'] = pkg.description
+ if isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
+ model['InstallDate'] = pywbem.CIMDateTime(
+ datetime.fromtimestamp(pkg.installtime))
+ if pkg.vendor:
+ model['Manufacturer'] = pkg.vendor
+ model['Release'] = pkg.release
+ model['Epoch'] = pywbem.Uint16(pkg.epoch)
+ model["Architecture"] = pkg.arch
+ model['License'] = pkg.license
+ model['Group'] = pkg.group
+ model['Size'] = pywbem.Uint64(pkg.size)
+ return model
+
+ return pkg2model
+
+class SoftwareFileCheck:
+ """
+ Just a namespace for functions related to FileCheck provider.
+ """
+
+ passed_flags_descriptions = (
+ "Existence",
+ "File Type",
+ "File Size",
+ "File Mode",
+ "Checksum",
+ "Device major/minor number",
+ "Symlink Target",
+ "User Ownership", "Group Ownership",
+ "Modify Time")
+
+ checksumtype_str2num = dict((v, k) for (k, v) in
+ yum.constants.RPM_CHECKSUM_TYPES.items())
+
+ @staticmethod
+ def pkg_checksum_type(pkg):
+ """
+ @return integer representation of checksum type
+ """
+ if not isinstance(pkg, yum.packages.YumAvailablePackage):
+ raise TypeError("pkg must be an instance of YumAvailablePackage")
+ if isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
+ return pkg.hdr[rpm.RPMTAG_FILEDIGESTALGO]
+ with self: # ensure, that _yum is inited
+ return SoftwareFileCheck.checksumtype_str2pywbem(
+ pkg.yumdb_info.checksum_type)
+
+ @staticmethod
+ def checksumtype_num2hash(csumt):
+ return getattr(hashlib, yum.constants.RPM_CHECKSUM_TYPES[csumt])
+
+ @staticmethod
+ def checksumtype_str2pywbem(alg):
+ try:
+ res = SoftwareFileCheck.checksumtype_str2num[alg.lower()]
+ except KeyError: res = 0
+ return pywbem.Uint16(res)
+
+ @staticmethod
+ def filetype_str2pywbem(ft):
+ try:
+ return pywbem.Uint16(
+ { 'file' : 1
+ , 'directory' : 2
+ , 'symlink' : 3
+ , 'fifo' : 4
+ , 'character device' : 5
+ , 'block device' : 6
+ }[ft])
+ except KeyError:
+ return pywbem.Uint16(0)
+
+ @staticmethod
+ def filetype_mode2pywbem(mode):
+ for i, name in enumerate(
+ ('REG', 'DIR', 'LNK', 'FIFO', 'CHR', 'BLK'), 1):
+ if getattr(stat, 'S_IS' + name)(mode):
+ return pywbem.Uint16(i)
+ return pywbem.Uint16(0)
+
+ @staticmethod
+ def mode2pywbem_flags(mode):
+ """
+ @param mode if None, file does not exist
+ """
+ if mode is None: return None
+ flags = []
+ for i, c in enumerate((
+ stat.S_IXOTH,
+ stat.S_IWOTH,
+ stat.S_IROTH,
+ stat.S_IXGRP,
+ stat.S_IWGRP,
+ stat.S_IRGRP,
+ stat.S_IXUSR,
+ stat.S_IWUSR,
+ stat.S_IRUSR,
+ stat.S_ISVTX,
+ stat.S_ISGID,
+ stat.S_ISUID)):
+ if c & mode:
+ flags.append(pywbem.Uint8(i))
+ return flags
+
+ @staticmethod
+ def hashfile(afile, hashers, blocksize=65536):
+ """
+ @param hashers is a list of hash objects
+ @return list of digest strings (in hex format) for each hash object
+ given in the same order
+ """
+ if not isinstance(hashers, (tuple, list, set, frozenset)):
+ hashers = (hashers, )
+ buf = afile.read(blocksize)
+ while len(buf) > 0:
+ for h in hashers: h.update(buf)
+ buf = afile.read(blocksize)
+ return [ h.hexdigest() for h in hashers ]
+
+ @staticmethod
+ def compute_checksums(env, checksum_type, file_type, file_path):
+ """
+ @param file_type is not a file, then zeroes are returned
+ @param checksum_type selected hash algorithm to compute second
+ checksum
+ @return (md5sum, checksum)
+ both checksums are computed from file_path's content
+ first one is always md5, the second one depends on checksum_type
+ if file does not exists, (None, None) is returned
+ """
+ hashers = [hashlib.md5()]
+ if checksum_type != SoftwareFileCheck.checksumtype_str2num["md5"]:
+ hashers.append(SoftwareFileCheck.checksumtype_num2hash(checksum_type)())
+ if file_type != SoftwareFileCheck.filetype_str2pywbem('file'):
+ rslts = ['0'*len(h.hexdigest()) for h in hashers]
+ else:
+ try:
+ with open(file_path, 'rb') as f:
+ rslts = SoftwareFileCheck.hashfile(f, hashers)
+ except (OSError, IOError) as e:
+ env.get_logger().log_error("could not open file \"%s\""
+ " for reading: %s" % (file_path, e))
+ return None, None
+ return (rslts[0], rslts[1] if len(rslts) > 1 else rslts[0]*2)
+
+ @staticmethod
+ def object_path2yumcheck(env, op):
+ """
+ @return instance of yum.packages._RPMVerifyPackage
+ this object holds RPMInstalledPackage under its po attribute
+ """
+ if not isinstance(op, pywbem.CIMInstanceName):
+ raise TypeError("op must be instance of CIMInstanceName, "
+ "not \"%s\"" % op.__class__.__name__)
+ log = env.get_logger()
+
+ tos = get_target_operating_system()[0]
+ if ( not op['Name'] or not op['SoftwareElementID']
+ or not op['CheckID']
+ or not op['CheckID'].endswith('#'+op['Name'])
+ or op['SoftwareElementID'].find(op['Version']) == -1):
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong keys.")
+ if op['SoftwareElementState'] not in ("2", 2):
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "Only \"Executable\" software element state supported")
+ if not check_target_operating_system(op['TargetOperatingSystem']):
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "Wrong target operating system.")
+ if not op['Name'] or not op['Version']:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ 'Both "Name" and "Version" must be given')
+ m = re_nevra.match(op['SoftwareElementID'])
+ if not m:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "Wrong SotwareElementID. Expected valid nevra"
+ " (name-epoch:version-release.arch).")
+ if op['Version'] != m.group('ver'):
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "Version does not match version part in SoftwareElementID.")
+
+ evra = "{}:{}-{}.{}".format(*(
+ (m.group(k) if k != "epoch" or m.group(k) else "0")
+ for k in ("epoch", 'ver', 'rel', 'arch')))
+ with YumDB.getInstance(env) as yb:
+ pl = yb.doPackageLists('installed')
+ exact, matched, unmatched = yum.packages.parsePackages(
+ pl.installed, [m.group('name')])
+ exact = yum.misc.unique(exact)
+ for pkg in exact:
+ if pkg.evra == evra: break
+ else:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "No matching package installed.")
+
+ fi = pkg.hdr.fiFromHeader()
+ vpkg = yum.packages._RPMVerifyPackage(pkg, fi,
+ SoftwareFileCheck.pkg_checksum_type(pkg), [], True)
+ if not op['Name'] in vpkg:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "File not found in RPM package.")
+ return vpkg
+
+ """
+ Named tuple to store results of rpm file check as pywbem values,
+ all results are in form:
+ (expected, reality)
+ where
+ expected is value from rpm package
+ reality is value obtained from installed file
+ None means, that value could not be obtained
+ except for "exists" and "md5_checksum" attributes, where "exists"
+ is boolean and md5_checksum is a string
+ for example:
+ file_check.file_type == (4, 3)
+ """
+ FileCheck = collections.namedtuple('FileCheck',
+ 'exists, md5_checksum, file_type, file_size, file_mode, '
+ 'checksum, device, link_target, user_id, group_id, '
+ 'last_modification_time')
+
+ @staticmethod
+ def test_file(env, checksum_type, vpf):
+ """
+ @param checksum type is a pywbem value for ChecksumType property
+ @return instance of FileCheck
+ """
+ if not isinstance(vpf, yum.packages._RPMVerifyPackageFile):
+ raise TypeError("vpf must be an instance of _RPMVerifyPackage,"
+ " not \"%s\"" % vpf.__class__.__name__)
+ exists = os.path.lexists(vpf.filename)
+ md5_checksum = None
+ expected = {
+ "file_type" : SoftwareFileCheck.filetype_str2pywbem(vpf.ftype),
+ "user_id" : pywbem.Uint32(pwd.getpwnam(vpf.user).pw_uid),
+ "group_id" : pywbem.Uint32(grp.getgrnam(vpf.group).gr_gid),
+ "file_mode" : pywbem.Uint32(vpf.mode),
+ "file_size" : pywbem.Uint64(vpf.size),
+ "link_target" : vpf.readlink if vpf.readlink else None,
+ "checksum" : vpf.digest[1],
+ "device" : (pywbem.Uint64(vpf.dev)
+ if vpf.ftype.endswith('device') else None),
+ "last_modification_time" : pywbem.Uint64(vpf.mtime)
+ }
+ if not exists:
+ reality = collections.defaultdict(lambda: None)
+ else:
+ fstat = os.lstat(vpf.filename)
+ reality = {
+ "file_type" : SoftwareFileCheck.filetype_mode2pywbem(fstat.st_mode),
+ "user_id" : pywbem.Uint32(fstat.st_uid),
+ "group_id" : pywbem.Uint32(fstat.st_gid),
+ "file_mode" : pywbem.Uint32(fstat.st_mode),
+ "file_size" : pywbem.Uint64(fstat.st_size),
+ "last_modification_time" : pywbem.Uint64(fstat.st_mtime)
+ }
+ reality["device"] = (pywbem.Uint64(fstat.st_dev)
+ if reality['file_type'] == SoftwareFileCheck.filetype_str2pywbem(
+ "device") else None)
+ reality["link_target"] = (os.readlink(vpf.filename)
+ if os.path.islink(vpf.filename) else None)
+ md5_checksum, checksum = SoftwareFileCheck.compute_checksums(
+ env, checksum_type, reality["file_type"], vpf.filename)
+ reality["checksum"] = checksum
+ kwargs = dict(exists=exists, md5_checksum=md5_checksum,
+ **dict((k, (expected[k], reality[k])) for k in expected))
+ return SoftwareFileCheck.FileCheck(**kwargs)
+
+ @staticmethod
+ def filecheck_passed(fc):
+ if not isinstance(fc, SoftwareFileCheck.FileCheck):
+ raise TypeError("fc must be an instance of FileCheck")
+ return ( fc.exists
+ and all( v[0] == v[1]
+ for k, v in fc._asdict().items() if (
+ isinstance(v, tuple)
+ and ( k != "last_modification_time"
+ or fc.file_type[0] == \
+ SoftwareFileCheck.filetype_str2pywbem("file")
+ ))))
+
+ @staticmethod
+ def filecheck_wrapper(namespace, classname):
+
+ def _filecheck2model_flags(fc):
+ flags = []
+ for k, v in fc._asdict().items():
+ if isinstance(v, tuple):
+ if ( k != "last_modification_time"
+ or fc.file_type[0] == SoftwareFileCheck.filetype_str2pywbem(
+ 'file')):
+ # last_modification_time check is valid only for
+ # regular files
+ flag = fc.exists and v[0] == v[1]
+ else:
+ flag = True
+ flags.append(flag)
+ elif isinstance(v, bool):
+ flags.append(v)
+ return flags
+
+ def filecheck2model(vpkg, fn, env, keys_only=True,
+ model=None, fc=None):
+ if not isinstance(vpkg, yum.packages._RPMVerifyPackage):
+ raise TypeError(
+ "vpkg must be an instance of _RPMVerifyPackage")
+ if not fn in vpkg:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "File \"%s\" not found among package files" % fn)
+ if model is None:
+ model = pywbem.CIMInstanceName(classname, namespace=namespace)
+ if not keys_only:
+ model = pywbem.CIMInstance(classname, path=model)
+ if fc is not None:
+ if not isinstance(fc, SoftwareFileCheck.FileCheck):
+ raise TypeError("fc must be an instance of FileCheck")
+ pkg = vpkg.po
+ vpf = vpkg._files[fn]
+ model['Name'] = vpf.filename
+ model['SoftwareElementID'] = pkg.nevra
+ model['SoftwareElementState'] = pywbem.Uint16(2)
+ model['TargetOperatingSystem'] = pywbem.Uint16(
+ get_target_operating_system()[0])
+ model['Version'] = pkg.version
+ model['CheckID'] = '%s#%s' % (pkg.name, vpf.filename)
+ if not keys_only:
+ #model['Caption'] = '' # TODO
+ #model['CheckMode'] = bool() # TODO
+ #model['CRC1'] = pywbem.Uint32() # TODO
+ #model['CRC2'] = pywbem.Uint32() # TODO
+ #model['CreateTimeStamp'] = pywbem.CIMDateTime() # TODO
+ #model['Description'] = '' # TODO
+ #model['ElementName'] = '' # TODO
+ #model['InstanceID'] = '' # TODO
+ model['FileName'] = os.path.basename(vpf.filename)
+ model['ChecksumType'] = csumt = \
+ pywbem.Uint16(SoftwareFileCheck.pkg_checksum_type(pkg))
+ if fc is None:
+ fc = SoftwareFileCheck.test_file(env, csumt, vpf)
+ for mattr, fattr in (
+ ('FileType', 'file_type'),
+ ('FileUserID', 'user_id'),
+ ('FileGroupID', 'group_id'),
+ ('FileMode', 'file_mode'),
+ ('LastModificationTime', 'last_modification_time'),
+ ('FileSize', 'file_size'),
+ ('LinkTarget', 'link_target'),
+ ('Checksum', 'checksum')):
+ exp, rea = getattr(fc, fattr)
+ if exp is not None:
+ model['Expected' + mattr] = exp
+ if rea is not None:
+ model[mattr] = rea
+ model['ExpectedFileModeFlags'] = \
+ SoftwareFileCheck.mode2pywbem_flags(fc.file_mode[0])
+ if fc.exists:
+ model['FileModeFlags'] = SoftwareFileCheck.mode2pywbem_flags(
+ fc.file_mode[1])
+ model['FileExists'] = fc.exists
+ if fc.md5_checksum is not None:
+ model['MD5Checksum'] = fc.md5_checksum
+ model['PassedFlags'] = _filecheck2model_flags(fc)
+ model['PassedFlagsDescriptions'] = list(
+ SoftwareFileCheck.passed_flags_descriptions)
+ return model
+
+ return filecheck2model
+