summaryrefslogtreecommitdiffstats
path: root/src/software/openlmi/software/util/common.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/software/openlmi/software/util/common.py')
-rw-r--r--src/software/openlmi/software/util/common.py825
1 files changed, 0 insertions, 825 deletions
diff --git a/src/software/openlmi/software/util/common.py b/src/software/openlmi/software/util/common.py
deleted file mode 100644
index 685171a..0000000
--- a/src/software/openlmi/software/util/common.py
+++ /dev/null
@@ -1,825 +0,0 @@
-# -*- encoding: utf-8 -*-
-# Software Management Providers
-#
-# Copyright (C) 2012 Red Hat, Inc. All rights reserved.
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-#
-# Authors: Michal Minar <miminar@redhat.com>
-#
-
-"""Common utilities for LMI_Software* providers
-"""
-
-import collections
-from datetime import datetime
-import grp
-import hashlib
-import itertools
-import os
-import platform
-import pwd
-import re
-import rpm
-import socket
-import stat
-import pywbem
-import yum
-import cmpi_pywbem_bindings as pycimmb
-from openlmi.software.util import singletonmixin
-
-RE_EVRA = re.compile(r'^(?P<epoch>\d+):(?P<ver>[^-]+)'
- r'-(?P<rel>.+)\.(?P<arch>[^.]+)$')
-RE_NEVRA = re.compile(r'^(?P<name>.+)-(?P<evra>((?P<epoch>\d+):)?(?P<ver>[^-]+)'
- r'-(?P<rel>.+)\.(?P<arch>[^.]+))$')
-
-RPMDB_PATH = '/var/lib/rpm/Packages'
-
-class YumDB(singletonmixin.Singleton):
- """
- Context manager for accessing yum/rpm database.
- """
-
- # this is to inform Singleton, that __init__ should be called only once
- ignoreSubsequent = True
-
- def __init__(self, env, *args, **kwargs): #pylint: disable=W0231
- if not isinstance(env, pycimmb.ProviderEnvironment):
- raise TypeError("env must be instance of"
- " pycimmb.ProviderEnvironment")
- self._yum_args = (args, kwargs)
- self._yum = None
- self._db_mtime = 0
- self._lock_cnt = 0
- self.env = env
- env.get_logger().log_info('init called')
-
- def is_dirty(self):
- """
- @return True if rpm database has been modified since last update
- """
- return self._db_mtime < os.stat(RPMDB_PATH).st_mtime
-
- def is_locked(self):
- """
- @return True if rpm database is locked
- """
- return self._yum._lockfile is not None
-
- def update_db(self):
- """
- Call to update database metadata.
- """
- self.env.get_logger().log_info('updating rpmdb')
- self._db_mtime = os.stat(RPMDB_PATH).st_mtime
- self._yum.doConfigSetup()
- self._yum.doTsSetup()
- self._yum.doRpmDBSetup()
-
- def __enter__(self):
- self._lock_cnt += 1
- if self._lock_cnt < 2:
- self._yum = yum.YumBase(*self._yum_args[0], **self._yum_args[1])
- if not self.is_locked() and self.is_dirty():
- self.update_db()
- self._yum.doLock()
- return self
-
- def __exit__(self, exc_type, exc_value, traceback):
- if self._lock_cnt == 1:
- del self._yum
- self._lock_cnt -= 1
-
- def __getattr__(self, name):
- if not self.is_locked() and self.is_dirty():
- self.update_db()
- return getattr(self._yum, name)
-
-
-def _get_distname():
- """
- @return name of linux distribution
- """
- if hasattr(platform, 'linux_distribution'):
- return platform.linux_distribution(
- full_distribution_name=False)[0].lower()
- else:
- return platform.dist()[0].lower()
-
-
-def get_target_operating_system():
- """
- @return (val, text).
- Where val is a number from ValueMap of TargetOperatingSystem property
- of CIM_SoftwareElement class and text is its testual representation.
- """
-
- system = platform.system()
- if system.lower() == 'linux':
- try:
- val, dist = \
- { 'redhat' : (79, 'RedHat Enterprise Linux')
- , 'suse' : (81, 'SUSE')
- , 'mandriva' : (88, 'Mandriva')
- , 'ubuntu' : (93, 'Ubuntu')
- , 'debian' : (95, 'Debian')
- }[_get_distname()]
- except KeyError:
- linrel = platform.uname()[2]
- if linrel.startswith('2.4'):
- val, dist = (97, 'Linux 2.4.x')
- elif linrel.startswith('2.6'):
- val, dist = (99, 'Linux 2.6.x')
- else:
- return (36, 'LINUX') # no check for x86_64
- if platform.machine() == 'x86_64':
- val += 1
- dist += ' 64-Bit'
- return (val, dist)
- elif system.lower() in ('macosx', 'darwin'):
- return (2, 'MACOS')
- # elif system.lower() == 'windows': # no general value
- else:
- return (0, 'Unknown')
-
-def get_computer_system_op(prefix='Linux'):
- """
- @return object path of CIM_ComputerSystem for this system
- """
- return pywbem.CIMInstanceName(
- classname='%s_ComputerSystem' % prefix,
- keybindings={
- "CreationClassName": "%s_ComputerSystem" % prefix
- , "Name" : socket.gethostname() },
- namespace="root/cimv2")
-
-def check_target_operating_system(system):
- """
- @return if param system matches current target operating system
- """
- if isinstance(system, basestring):
- system = int(system)
- if not isinstance(system, (int, long)):
- raise TypeError("system must be either string or integer, not {}"
- .format(system.__class__.__name__))
- tos = get_target_operating_system()
- if system == tos:
- return True
- if system == 36: # linux
- if platform.system().lower() == "linux":
- return True
- if ( system >= 97 and system <= 100 # linux 2.x.x
- and platform.uname()[2].startswith('2.4' if system < 99 else '2.6')
- # check machine
- and ( bool(platform.machine().endswith('64'))
- == bool(not (system % 2)))):
- return True
- return False
-
-def check_computer_system_op(env, system):
- """
- @param system is object path referring to CIM_ComputerSystem instance
- passed as argument to some cim function
- @return True if this instance matches our system; otherwise a CIMError
- will be raised
- """
- if not isinstance(system, pywbem.CIMInstanceName):
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "\"System\" must be a CIMInstanceName")
- our_system = get_computer_system_op('CIM')
- chandle = env.get_cimom_handle()
- if not chandle.is_subclass(system.namespace,
- sub=system.classname,
- super=our_system.classname):
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Class of \"System\" must be a sublass of %s" %
- our_system.classname)
- if not 'CreationClassName' in system or not 'Name' in system:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "\"System\" is missing one of keys")
- if not chandle.is_subclass(system.namespace,
- sub=system['CreationClassName'],
- super=our_system.classname):
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "CreationClassName of \"System\" must be a sublass of %s" %
- our_system.classname)
- if system['Name'] != our_system['Name']:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Name of \"System\" does not match \"%s\"" %
- our_system['Name'])
- return True
-
-
-def match_pkg(pkg, **kwargs):
- """
- all not Null and not empty arguments will be matched against pkg
- attributes; if all of them match, return True
-
- possible arguments:
- name, epoch, version, release, arch
- evra, nevra
- """
- for attr in ( 'evra', 'nevra', 'name', 'epoch'
- , 'version', 'release', 'arch'):
- value = kwargs.get(attr, None)
- if value and getattr(pkg, attr) != value:
- return False
- return True
-
-class SoftwarePackage:
- """
- Just a namespace for common function related to SoftwarePackage provider.
- TODO: make it a submodule
- """
-
- @staticmethod
- def object_path2pkg(env, objpath, package_list='installed'):
- """
- @param objpath must contain precise information of package,
- otherwise a CIM_ERR_NOT_FOUND error is raised
- @param package_list one of {'installed', 'all', 'available'}
- says, where to look for given package
- """
- if not isinstance(objpath, pywbem.CIMInstanceName):
- raise TypeError("objpath must be an instance of CIMInstanceName")
- if not isinstance(package_list, basestring):
- raise TypeError("package_list must be a string")
- if not package_list in ('installed', 'all', 'available'):
- raise ValueError('unsupported package list "%s"'%package_list)
-
- if ( not objpath['Name'] or not objpath['SoftwareElementID']
- or not objpath['SoftwareElementID'].startswith(objpath['Name'])
- or objpath['SoftwareElementID'].find(objpath['Version']) == -1):
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong keys.")
- if not check_target_operating_system(objpath['TargetOperatingSystem']):
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Wrong target operating system.")
- if not objpath['Name'] or not objpath['Version']:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- 'Both "Name" and "Version" must be given')
- match = RE_NEVRA.match(objpath['SoftwareElementID'])
- if not match:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Wrong SotwareElementID. Expected valid nevra"
- " (name-[epoch:]version-release.arch).")
- if objpath['Version'] != match.group('ver'):
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Version does not match version part in SoftwareElementID.")
- evra = "{}:{}-{}.{}".format(*(
- (match.group(k) if k != "epoch" or match.group(k) else "0")
- for k in ("epoch", 'ver', 'rel', 'arch')))
- with YumDB.getInstance(env) as ydb:
- pkglist = ydb.doPackageLists(package_list,
- showdups=package_list != 'installed')
- if package_list != 'all':
- pkglist = getattr(pkglist, package_list)
- else:
- # NOTE: available ∩ installed = ∅
- pkglist = itertools.chain(pkglist.available, pkglist.installed)
- exact, _, _ = yum.packages.parsePackages(pkglist, [objpath['Name']])
- for pkg in yum.misc.unique(exact):
- if pkg.evra == evra:
- return pkg
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "No matching package found.")
-
- @staticmethod
- def object_path2pkg_search(env, objpath):
- """
- similar to object_path2pkg, but tries to find best suitable
- package matching keys
-
- If any matching package is already installed, it is returned.
- Otherwise available package with highest version is returned.
-
- @param objpath may be object of CIMInstance or CIMInstanceName and
- must contain at least \"Name\" or \"SoftwareElementID\"
- @return instance of yum.rpmsack.RPMInstalledPackage in case of
- installed package, otherwise yum.packages.YumAvailablePackage
- """
- logger = env.get_logger()
- if isinstance(objpath, pywbem.CIMInstance):
- def _get_key(k):
- """@return value of instance's key"""
- value = objpath.properties.get(k, None)
- if isinstance(value, pywbem.CIMProperty):
- return value.value
- if value is not None:
- return value
- logger.log_error('missing key "{}" in inst.props'.format(k))
- return objpath.path[k] if k in objpath.path else None
- elif isinstance(objpath, pywbem.CIMInstanceName):
- _get_key = lambda k: objpath[k] if k in objpath else None
- else:
- raise TypeError("objpath must be either CIMInstance"
- "or CIMInstanceName")
-
- # parse and check arguments
- match_props = {} # args for match_pkg
- if _get_key('SoftwareElementID'):
- match = RE_NEVRA.match(_get_key('SoftwareElementID'))
- if not match:
- raise pywbem.CIMError(pywbem.CIM_ERR_INVALID_PARAMETER,
- "SoftwareElementID could not be parsed.")
- for k in ('name', 'version', 'release', 'arch'):
- mkey = k if k not in ('version', 'release') else k[:3]
- match_props[k] = match.group(mkey)
- if not match.group("epoch"):
- match_props["epoch"] = "0"
- else:
- for k in ('name', 'epoch', 'version', 'release', 'arch'):
- ikey = k if k != 'arch' else "architecture"
- if _get_key(ikey):
- match_props[k] = _get_key(ikey)
-
- if not match_props:
- raise pywbem.CIMError(pywbem.CIM_ERR_FAILED,
- "Too few key values given (give at least a Name).")
- if not 'name' in match_props:
- raise pywbem.CIMError(pywbem.CIM_ERR_FAILED,
- "Missing either Name or SoftwareElementID property.")
-
- # get available packages
- pkglist = YumDB.getInstance(env).doPackageLists('all', showdups=True)
- # NOTE: available ∩ installed = ∅
- exact, _, _ = yum.packages.parsePackages(
- itertools.chain(pkglist.available, pkglist.installed),
- [match_props['name']])
- exact = yum.misc.unique(exact)
- exact_orig = exact
- exact = sorted([ p for p in exact if match_pkg(p, **match_props) ])
- if len(exact) == 0:
- logger.log_error('could not find any package for query: {}'
- ' in list: {}'
- .format(match_props, [p.nevra for p in exact_orig]))
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "No matching package found.")
- for pkg in exact: # check, whether package is already installed
- if isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
- return pkg
- logger.log_info(('found multiple matching packages'
- ' for query: {}' if len(exact) > 1 else
- 'exact match found for query: {}')
- .format(match_props))
- return exact[-1] # select highest version
-
- @staticmethod
- def pkg2model(env, pkg, keys_only=True, model=None):
- """
- @param model if None, will be filled with data, otherwise
- a new instance of CIMInstance or CIMObjectPath is created
- """
- #if not isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
- #if not isinstance(pkg, yum.packages.YumHeaderPackage):
- if not isinstance(pkg, yum.packages.YumAvailablePackage):
- raise TypeError(
- "pkg must be an instance of YumAvailablePackage")
- if model is None:
- model = pywbem.CIMInstanceName('LMI_SoftwarePackage',
- namespace='root/cimv2')
- if not keys_only:
- model = pywbem.CIMInstance('LMI_SoftwarePackage', path=model)
- if isinstance(model, pywbem.CIMInstance):
- def _set_key(k, value):
- """Sets the value of key property of cim instance"""
- model[k] = value
- model.path[k] = value #pylint: disable=E1103
- else:
- _set_key = model.__setitem__
- with YumDB.getInstance(env):
- _set_key('Name', pkg.name)
- _set_key('SoftwareElementID', pkg.nevra)
- _set_key('SoftwareElementState', pywbem.Uint16(2
- if isinstance(pkg, yum.rpmsack.RPMInstalledPackage)
- else 1))
- _set_key('TargetOperatingSystem',
- pywbem.Uint16(get_target_operating_system()[0]))
- _set_key('Version', pkg.version)
- if not keys_only:
- model['Caption'] = pkg.summary
- model['Description'] = pkg.description
- if isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
- model['InstallDate'] = pywbem.CIMDateTime(
- datetime.fromtimestamp(pkg.installtime))
- if pkg.vendor:
- model['Manufacturer'] = pkg.vendor
- model['Release'] = pkg.release
- model['Epoch'] = pywbem.Uint16(pkg.epoch)
- model["Architecture"] = pkg.arch
- model['License'] = pkg.license
- model['Group'] = pkg.group
- model['Size'] = pywbem.Uint64(pkg.size)
- return model
-
-class SoftwareFileCheck:
- """
- Just a namespace for functions related to FileCheck provider.
- TODO: make it a submodule
- """
-
- passed_flags_descriptions = (
- "Existence",
- "File Type",
- "File Size",
- "File Mode",
- "File Checksum",
- "Device major/minor number",
- "Symlink Target",
- "User Ownership", "Group Ownership",
- "Modify Time")
-
- checksumtype_str2num = dict((val, k) for (k, val) in
- yum.constants.RPM_CHECKSUM_TYPES.items())
-
- @staticmethod
- def pkg_checksum_type(pkg):
- """
- @return integer representation of checksum type
- """
- if not isinstance(pkg, yum.packages.YumAvailablePackage):
- raise TypeError("pkg must be an instance of YumAvailablePackage")
- if isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
- return pkg.hdr[rpm.RPMTAG_FILEDIGESTALGO]
- with YumDB.getInstance(): # ensure, that _yum is inited
- return SoftwareFileCheck.checksumtype_str2pywbem(
- pkg.yumdb_info.checksum_type)
-
- @staticmethod
- def checksumtype_num2hash(csumt):
- """
- @param csumt checksum type as a number obtained from package
- @return hash function object corresponding to csumt
- """
- return getattr(hashlib, yum.constants.RPM_CHECKSUM_TYPES[csumt])
-
- @staticmethod
- def checksumtype_str2pywbem(alg):
- """
- @param alg is a name of algorithm used for checksum
- @return pywbem number corresponding to given alg
- """
- try:
- res = SoftwareFileCheck.checksumtype_str2num[alg.lower()]
- except KeyError:
- res = 0
- return pywbem.Uint16(res)
-
- @staticmethod
- def filetype_str2pywbem(file_type):
- """
- @param file_type is a name of file type obtained from pkg headers
- @return pywbem number corresponding to thus file type
- """
- try:
- return pywbem.Uint16(
- { 'file' : 1
- , 'directory' : 2
- , 'symlink' : 3
- , 'fifo' : 4
- , 'character device' : 5
- , 'block device' : 6
- }[file_type])
- except KeyError:
- return pywbem.Uint16(0)
-
- @staticmethod
- def filetype_mode2pywbem(mode):
- """
- @param mode is a raw file mode as integer
- @return pywbem numeric value of file's type
- """
- for i, name in enumerate(
- ('REG', 'DIR', 'LNK', 'FIFO', 'CHR', 'BLK'), 1):
- if getattr(stat, 'S_IS' + name)(mode):
- return pywbem.Uint16(i)
- return pywbem.Uint16(0)
-
- @staticmethod
- def mode2pywbem_flags(mode):
- """
- @param mode if None, file does not exist
- @return list of integer flags describing file's access permissions
- """
- if mode is None:
- return None
- flags = []
- for i, flag in enumerate((
- stat.S_IXOTH,
- stat.S_IWOTH,
- stat.S_IROTH,
- stat.S_IXGRP,
- stat.S_IWGRP,
- stat.S_IRGRP,
- stat.S_IXUSR,
- stat.S_IWUSR,
- stat.S_IRUSR,
- stat.S_ISVTX,
- stat.S_ISGID,
- stat.S_ISUID)):
- if flag & mode:
- flags.append(pywbem.Uint8(i))
- return flags
-
- @staticmethod
- def hashfile(afile, hashers, blocksize=65536):
- """
- @param hashers is a list of hash objects
- @return list of digest strings (in hex format) for each hash object
- given in the same order
- """
- if not isinstance(hashers, (tuple, list, set, frozenset)):
- hashers = (hashers, )
- buf = afile.read(blocksize)
- while len(buf) > 0:
- for hashfunc in hashers:
- hashfunc.update(buf)
- buf = afile.read(blocksize)
- return [ hashfunc.hexdigest() for hashfunc in hashers ]
-
- @staticmethod
- def compute_checksums(env, checksum_type, file_type, file_path):
- """
- @param file_type is not a file, then zeroes are returned
- @param checksum_type selected hash algorithm to compute second
- checksum
- @return (md5sum, checksum)
- both checksums are computed from file_path's content
- first one is always md5, the second one depends on checksum_type
- if file does not exists, (None, None) is returned
- """
- hashers = [hashlib.md5()] #pylint: disable=E1101
- if checksum_type != SoftwareFileCheck.checksumtype_str2num["md5"]:
- hashers.append(SoftwareFileCheck.checksumtype_num2hash(
- checksum_type)())
- if file_type != SoftwareFileCheck.filetype_str2pywbem('file'):
- rslts = ['0'*len(h.hexdigest()) for h in hashers]
- else:
- try:
- with open(file_path, 'rb') as fobj:
- rslts = SoftwareFileCheck.hashfile(fobj, hashers)
- except (OSError, IOError) as exc:
- env.get_logger().log_error("could not open file \"%s\""
- " for reading: %s" % (file_path, exc))
- return None, None
- return (rslts[0], rslts[1] if len(rslts) > 1 else rslts[0]*2)
-
- @staticmethod
- def object_path2yumcheck(env, objpath):
- """
- @return instance of yum.packages._RPMVerifyPackage
- this object holds RPMInstalledPackage under its po attribute
- """
- if not isinstance(objpath, pywbem.CIMInstanceName):
- raise TypeError("objpath must be instance of CIMInstanceName, "
- "not \"%s\"" % objpath.__class__.__name__)
-
- if ( not objpath['Name'] or not objpath['SoftwareElementID']
- or not objpath['CheckID']
- or not objpath['CheckID'].endswith('#'+objpath['Name'])
- or objpath['SoftwareElementID'].find(objpath['Version']) == -1):
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong keys.")
- if objpath['SoftwareElementState'] not in ("2", 2):
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Only \"Executable\" software element state supported")
- if not check_target_operating_system(objpath['TargetOperatingSystem']):
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Wrong target operating system.")
- if not objpath['Name'] or not objpath['Version']:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- 'Both "Name" and "Version" must be given')
- match = RE_NEVRA.match(objpath['SoftwareElementID'])
- if not match:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Wrong SotwareElementID. Expected valid nevra"
- " (name-epoch:version-release.arch).")
- if objpath['Version'] != match.group('ver'):
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "Version does not match version part in SoftwareElementID.")
-
- evra = "{}:{}-{}.{}".format(*(
- (match.group(k) if k != "epoch" or match.group(k) else "0")
- for k in ("epoch", 'ver', 'rel', 'arch')))
- with YumDB.getInstance(env) as ydb:
- pkglist = ydb.doPackageLists('installed')
- exact, _, _ = yum.packages.parsePackages(
- pkglist.installed, [match.group('name')])
- for pkg in yum.misc.unique(exact):
- if pkg.evra != evra:
- continue
- vpkg = yum.packages._RPMVerifyPackage(
- pkg, pkg.hdr.fiFromHeader(),
- SoftwareFileCheck.pkg_checksum_type(pkg), [], True)
- if not objpath['Name'] in vpkg:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "File not found in RPM package.")
- return vpkg
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "No matching package installed.")
-
- # Named tuple to store results of rpm file check as pywbem values,
- # all results are in form:
- # (expected, reality)
- # where
- # expected is value from rpm package
- # reality is value obtained from installed file
- # None means, that value could not be obtained
- # except for "exists" and "md5_checksum" attributes, where "exists"
- # is boolean and md5_checksum is a string
- # for example:
- # file_check.file_type == (4, 3)
- FileCheck = collections.namedtuple('FileCheck',
- 'exists, md5_checksum, file_type, file_size, file_mode, '
- 'file_checksum, device, link_target, user_id, group_id, '
- 'last_modification_time')
-
- @staticmethod
- def test_file(env, checksum_type, vpf):
- """
- @param checksum type is a pywbem value for ChecksumType property
- @return instance of FileCheck
- """
- if not isinstance(vpf, yum.packages._RPMVerifyPackageFile):
- raise TypeError("vpf must be an instance of _RPMVerifyPackage,"
- " not \"%s\"" % vpf.__class__.__name__)
- exists = os.path.lexists(vpf.filename)
- md5_checksum = None
- expected = {
- "file_type" : SoftwareFileCheck.filetype_str2pywbem(vpf.ftype),
- "user_id" : pywbem.Uint32(pwd.getpwnam(vpf.user).pw_uid),
- "group_id" : pywbem.Uint32(grp.getgrnam(vpf.group).gr_gid),
- "file_mode" : pywbem.Uint32(vpf.mode),
- "file_size" : pywbem.Uint64(vpf.size),
- "link_target" : vpf.readlink if vpf.readlink else None,
- "file_checksum" : vpf.digest[1],
- "device" : (pywbem.Uint64(vpf.dev)
- if vpf.ftype.endswith('device') else None),
- "last_modification_time" : pywbem.Uint64(vpf.mtime)
- }
- if not exists:
- reality = collections.defaultdict(lambda: None)
- else:
- fstat = os.lstat(vpf.filename)
- reality = {
- "file_type" : SoftwareFileCheck.filetype_mode2pywbem(
- fstat.st_mode),
- "user_id" : pywbem.Uint32(fstat.st_uid),
- "group_id" : pywbem.Uint32(fstat.st_gid),
- "file_mode" : pywbem.Uint32(fstat.st_mode),
- "file_size" : pywbem.Uint64(fstat.st_size),
- "last_modification_time" : pywbem.Uint64(fstat.st_mtime)
- }
- reality["device"] = (pywbem.Uint64(fstat.st_dev)
- if reality['file_type'] ==
- SoftwareFileCheck.filetype_str2pywbem("device") else None)
- reality["link_target"] = (os.readlink(vpf.filename)
- if os.path.islink(vpf.filename) else None)
- md5_checksum, checksum = SoftwareFileCheck.compute_checksums(
- env, checksum_type, reality["file_type"], vpf.filename)
- reality["file_checksum"] = checksum
- kwargs = dict(exists=exists, md5_checksum=md5_checksum,
- **dict((k, (expected[k], reality[k])) for k in expected))
- return SoftwareFileCheck.FileCheck(**kwargs)
-
- @staticmethod
- def filecheck_passed(file_check):
- """
- @return True if installed file passed all checks.
- """
- if not isinstance(file_check, SoftwareFileCheck.FileCheck):
- raise TypeError("file_check must be an instance of FileCheck")
- passed = file_check.exists
- for k, val in file_check._asdict().items():
- if not passed:
- break
- if not isinstance(val, tuple):
- continue
- if ( k == "last_modification_time"
- and file_check.file_type[0] != \
- SoftwareFileCheck.filetype_str2pywbem("file")):
- continue
- if ( k == "file_mode"
- and file_check.file_type[0] == \
- SoftwareFileCheck.filetype_str2pywbem("symlink")):
- continue
- passed = val[0] == val[1]
- return passed
-
- @staticmethod
- def _filecheck2model_flags(file_check):
- """
- @param file_check is an instance of FileCheck
- @return pywbem value for PassedFlags property
- """
- flags = []
- for k, value in file_check._asdict().items(): #pylint: disable=W0212
- if isinstance(value, tuple):
- if ( k == "last_modification_time"
- and file_check.file_type[0] != \
- SoftwareFileCheck.filetype_str2pywbem('file')):
- # last_modification_time check is valid only for
- # regular files
- flag = file_check.exists
- elif ( k == "file_mode"
- and file_check.file_type[0] == \
- SoftwareFileCheck.filetype_str2pywbem('symlink')):
- # do not check mode of symlinks
- flag = ( file_check.exists
- and ( file_check.file_type[0]
- == file_check.file_type[1]))
- else:
- flag = file_check.exists and value[0] == value[1]
- flags.append(flag)
- elif isinstance(value, bool):
- flags.append(value)
- return flags
-
- @staticmethod
- def _fill_non_key_values(env, model, pkg, vpf, file_check):
- """
- Fills a non key values into instance of SoftwareFileCheck.
- """
- model['FileName'] = os.path.basename(vpf.filename)
- model['FileChecksumType'] = csumt = \
- pywbem.Uint16(SoftwareFileCheck.pkg_checksum_type(pkg))
- if file_check is None:
- file_check = SoftwareFileCheck.test_file(env, csumt, vpf)
- for mattr, fattr in (
- ('FileType', 'file_type'),
- ('FileUserID', 'user_id'),
- ('FileGroupID', 'group_id'),
- ('FileMode', 'file_mode'),
- ('LastModificationTime', 'last_modification_time'),
- ('FileSize', 'file_size'),
- ('LinkTarget', 'link_target'),
- ('FileChecksum', 'file_checksum')):
- exp, rea = getattr(file_check, fattr)
- if exp is not None:
- model['Expected' + mattr] = exp
- if rea is not None:
- model[mattr] = rea
- model['ExpectedFileModeFlags'] = \
- SoftwareFileCheck.mode2pywbem_flags(file_check.file_mode[0])
- if file_check.exists:
- model['FileModeFlags'] = \
- SoftwareFileCheck.mode2pywbem_flags(file_check.file_mode[1])
- model['FileExists'] = file_check.exists
- if file_check.md5_checksum is not None:
- model['MD5Checksum'] = file_check.md5_checksum
- model['PassedFlags'] = SoftwareFileCheck._filecheck2model_flags(
- file_check)
- model['PassedFlagsDescriptions'] = list(
- SoftwareFileCheck.passed_flags_descriptions)
-
- @staticmethod
- def filecheck2model(vpkg, file_name, env, keys_only=True,
- model=None, file_check=None):
- """
- @param vpkg is an instance of yum.packages_RPMVerifyPackage
- @param file_name a absolute file path contained in package
- @param keys_only if True, then only key values will be filed
- @param model if given, then this instance will be modified and
- returned
- @param file_check if not given, it will be computed
- @return instance of LMI_SoftwareFileCheck class with all desired
- values filed
- """
- if not isinstance(vpkg, yum.packages._RPMVerifyPackage):
- raise TypeError(
- "vpkg must be an instance of _RPMVerifyPackage")
- if not file_name in vpkg:
- raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
- "File \"%s\" not found among package files" % file_name)
- if model is None:
- model = pywbem.CIMInstanceName("LMI_SoftwareFileCheck",
- namespace="root/cimv2")
- if not keys_only:
- model = pywbem.CIMInstance("LMI_SoftwareFileCheck", path=model)
- if file_check is not None:
- if not isinstance(file_check, SoftwareFileCheck.FileCheck):
- raise TypeError("file_check must be an instance of FileCheck")
- pkg = vpkg.po
- vpf = vpkg._files[file_name]
- model['Name'] = vpf.filename
- model['SoftwareElementID'] = pkg.nevra
- model['SoftwareElementState'] = pywbem.Uint16(2)
- model['TargetOperatingSystem'] = pywbem.Uint16(
- get_target_operating_system()[0])
- model['Version'] = pkg.version
- model['CheckID'] = '%s#%s' % (pkg.name, vpf.filename)
- if not keys_only:
- SoftwareFileCheck._fill_non_key_values(
- env, model, pkg, vpf, file_check)
- return model
-