diff options
Diffstat (limited to 'src/software/openlmi/software/util/common.py')
-rw-r--r-- | src/software/openlmi/software/util/common.py | 665 |
1 files changed, 334 insertions, 331 deletions
diff --git a/src/software/openlmi/software/util/common.py b/src/software/openlmi/software/util/common.py index 7b54f1e..7deef32 100644 --- a/src/software/openlmi/software/util/common.py +++ b/src/software/openlmi/software/util/common.py @@ -40,15 +40,19 @@ import yum import cmpi_pywbem_bindings as pycimmb from openlmi.software.util import singletonmixin -re_evra = re.compile(r'^(?P<epoch>\d+):(?P<ver>[^-]+)' +RE_EVRA = re.compile(r'^(?P<epoch>\d+):(?P<ver>[^-]+)' r'-(?P<rel>.+)\.(?P<arch>[^.]+)$') -re_nevra = re.compile(r'^(?P<name>.+)-(?P<evra>((?P<epoch>\d+):)?(?P<ver>[^-]+)' +RE_NEVRA = re.compile(r'^(?P<name>.+)-(?P<evra>((?P<epoch>\d+):)?(?P<ver>[^-]+)' r'-(?P<rel>.+)\.(?P<arch>[^.]+))$') -rpmdb_path = '/var/lib/rpm/Packages' +RPMDB_PATH = '/var/lib/rpm/Packages' class YumDB(singletonmixin.Singleton): + """ + Context manager for accessing yum/rpm database. + """ + # this is to inform Singleton, that __init__ should be called only once ignoreSubsequent = True def __init__(self, env, *args, **kwargs): @@ -63,14 +67,23 @@ class YumDB(singletonmixin.Singleton): env.get_logger().log_info('init called') def is_dirty(self): - return self._db_mtime < os.stat(rpmdb_path).st_mtime + """ + @return True if rpm database has been modified since last update + """ + return self._db_mtime < os.stat(RPMDB_PATH).st_mtime def is_locked(self): + """ + @return True if rpm database is locked + """ return self._yum._lockfile is not None def update_db(self): + """ + Call to update database metadata. + """ self.env.get_logger().log_info('updating rpmdb') - self._db_mtime = os.stat(rpmdb_path).st_mtime + self._db_mtime = os.stat(RPMDB_PATH).st_mtime self._yum.doConfigSetup() self._yum.doTsSetup() self._yum.doRpmDBSetup() @@ -96,6 +109,9 @@ class YumDB(singletonmixin.Singleton): def _get_distname(): + """ + @return name of linux distribution + """ if hasattr(platform, 'linux_distribution'): return platform.linux_distribution( full_distribution_name=False)[0].lower() @@ -138,11 +154,14 @@ def get_target_operating_system(): else: return (0, 'Unknown') -def get_computer_system_op(): +def get_computer_system_op(prefix='Linux'): + """ + @return object path of CIM_ComputerSystem for this system + """ return pywbem.CIMInstanceName( - classname='CIM_ComputerSystem', + classname='%s_ComputerSystem' % prefix, keybindings={ - "CreationClassName": "CIM_ComputerSystem" + "CreationClassName": "%s_ComputerSystem" % prefix , "Name" : socket.gethostname() }, namespace="root/cimv2") @@ -156,9 +175,11 @@ def check_target_operating_system(system): raise TypeError("system must be either string or integer, not {}" .format(system.__class__.__name__)) tos = get_target_operating_system() - if system == tos: return True + if system == tos: + return True if system == 36: # linux - if platform.system().lower() == "linux": return True + if platform.system().lower() == "linux": + return True if ( system >= 97 and system <= 100 # linux 2.x.x and platform.uname()[2].startswith('2.4' if system < 99 else '2.6') # check machine @@ -168,12 +189,18 @@ def check_target_operating_system(system): return False def check_computer_system_op(env, system): + """ + @param system is object path referring to CIM_ComputerSystem instance + passed as argument to some cim function + @return True if this instance matches our system; otherwise a CIMError + will be raised + """ if not isinstance(system, pywbem.CIMInstanceName): raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "\"System\" must be a CIMInstanceName") - our_system = get_computer_system_op() - ch = env.get_cimom_handle() - if not ch.is_subclass(system.namespace, + our_system = get_computer_system_op('CIM') + chandle = env.get_cimom_handle() + if not chandle.is_subclass(system.namespace, sub=system.classname, super=our_system.classname): raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, @@ -182,7 +209,7 @@ def check_computer_system_op(env, system): if not 'CreationClassName' in system or not 'Name' in system: raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "\"System\" is missing one of keys") - if not ch.is_subclass(system.namespace, + if not chandle.is_subclass(system.namespace, sub=system['CreationClassName'], super=our_system.classname): raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, @@ -204,128 +231,72 @@ def match_pkg(pkg, **kwargs): name, epoch, version, release, arch evra, nevra """ - for a in ('evra', 'nevra', 'name', 'epoch', 'version', 'release', 'arch'): - v = kwargs.get(a, None) - if v and getattr(pkg, a) != v: + for attr in ( 'evra', 'nevra', 'name', 'epoch' + , 'version', 'release', 'arch'): + value = kwargs.get(attr, None) + if value and getattr(pkg, attr) != value: return False return True class SoftwarePackage: """ Just a namespace for common function related to SoftwarePackage provider. + TODO: make it a submodule """ @staticmethod - def parse_group(group, env): - try: - res = \ - { "amusements/games" : 3 - , "amusements/graphics" : 4 - , "applications/archiving" : 5 - , "applications/communications" : 6 - , "applications/databases" : 7 - , "applications/editors" : 8 - , "applications/emulators" : 9 - , "applications/engineering" : 10 - , "applications/file" : 11 - , "applications/internet" : 12 - , "applications/multimedia" : 13 - , "applications/productivity" : 14 - , "applications/publishing" : 15 - , "applications/system" : 16 - , "applications/text" : 17 - , "development/build tools" : 18 - , "development/debug" : 19 - , "development/debuggers" : 20 - , "development/documentation" : 21 - , "development/java" : 22 - , "development/languages" : 23 - , "development/libraries" : 24 - , "development/libraries/java" : 25 - , "development/system" : 26 - , "development/tools" : 27 - , "documentation" : 28 - , "internet/www/dynamic content" : 29 - , "system/libraries" : 30 - , "system environment/base" : 31 - , "system environment/daemons" : 32 - , "system environment/kernel" : 33 - , "system environment/libraries" : 34 - , "system environment/shells" : 35 - , "text processing/markup/xml" : 36 - , "user interface/desktops" : 37 - , "user interface/x" : 38 - , "user interface/x hardware support" : 39 - , "utilities" : 40 - }[group.lower()] - except KeyError: - logger = env.get_logger() - if not group or group.lower() == "unspecified": - logger.log_info("unspecified group '{}'".format(group)) - res = 2 - else: - logger.log_error("failed to parse group '{}'".format(group)) - res = 0 - return pywbem.Uint16(res) - - @staticmethod - def object_path2pkg(env, op, package_list='installed'): + def object_path2pkg(env, objpath, package_list='installed'): """ - @param op must contain precise information of package, + @param objpath must contain precise information of package, otherwise a CIM_ERR_NOT_FOUND error is raised @param package_list one of {'installed', 'all', 'available'} says, where to look for given package """ - if not isinstance(op, pywbem.CIMInstanceName): - raise TypeError("op must be an instance of CIMInstanceName") + if not isinstance(objpath, pywbem.CIMInstanceName): + raise TypeError("objpath must be an instance of CIMInstanceName") if not isinstance(package_list, basestring): raise TypeError("package_list must be a string") if not package_list in ('installed', 'all', 'available'): raise ValueError('unsupported package list "%s"'%package_list) - tos = get_target_operating_system()[0] - if ( not op['Name'] or not op['SoftwareElementID'] - or not op['SoftwareElementID'].startswith(op['Name']) - or op['SoftwareElementID'].find(op['Version']) == -1): + if ( not objpath['Name'] or not objpath['SoftwareElementID'] + or not objpath['SoftwareElementID'].startswith(objpath['Name']) + or objpath['SoftwareElementID'].find(objpath['Version']) == -1): raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong keys.") -# if op['SoftwareElementState'] not in ("2", 2): -# raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, -# "Only \"Executable\" software element state supported") - if not check_target_operating_system(op['TargetOperatingSystem']): + if not check_target_operating_system(objpath['TargetOperatingSystem']): raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong target operating system.") - if not op['Name'] or not op['Version']: + if not objpath['Name'] or not objpath['Version']: raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, 'Both "Name" and "Version" must be given') - m = re_nevra.match(op['SoftwareElementID']) - if not m: + match = RE_NEVRA.match(objpath['SoftwareElementID']) + if not match: raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong SotwareElementID. Expected valid nevra" " (name-[epoch:]version-release.arch).") - if op['Version'] != m.group('ver'): + if objpath['Version'] != match.group('ver'): raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Version does not match version part in SoftwareElementID.") evra = "{}:{}-{}.{}".format(*( - (m.group(k) if k != "epoch" or m.group(k) else "0") + (match.group(k) if k != "epoch" or match.group(k) else "0") for k in ("epoch", 'ver', 'rel', 'arch'))) - with YumDB.getInstance(env) as yb: - pl = yb.doPackageLists(package_list, + with YumDB.getInstance(env) as ydb: + pkglist = ydb.doPackageLists(package_list, showdups=package_list != 'installed') if package_list != 'all': - pl = getattr(pl, package_list) + pkglist = getattr(pkglist, package_list) else: # NOTE: available ∩ installed = ∅ - pl = itertools.chain(pl.available, pl.installed) - exact,_,_ = yum.packages.parsePackages(pl, [op['Name']]) + pkglist = itertools.chain(pkglist.available, pkglist.installed) + exact, _, _ = yum.packages.parsePackages(pkglist, [objpath['Name']]) for pkg in yum.misc.unique(exact): - if pkg.evra == evra: break - else: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "No matching package found.") - return pkg + if pkg.evra == evra: + return pkg + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "No matching package found.") @staticmethod - def object_path2pkg_search(env, op): + def object_path2pkg_search(env, objpath): """ similar to object_path2pkg, but tries to find best suitable package matching keys @@ -333,40 +304,45 @@ class SoftwarePackage: If any matching package is already installed, it is returned. Otherwise available package with highest version is returned. - @param op may be object of CIMInstance or CIMInstanceName and + @param objpath may be object of CIMInstance or CIMInstanceName and must contain at least \"Name\" or \"SoftwareElementID\" @return instance of yum.rpmsack.RPMInstalledPackage in case of installed package, otherwise yum.packages.YumAvailablePackage """ logger = env.get_logger() - if isinstance(op, pywbem.CIMInstance): + if isinstance(objpath, pywbem.CIMInstance): def _get_key(k): - v = op.properties.get(k, None) - if isinstance(v, pywbem.CIMProperty): return v.value - if v is not None: return v + """@return value of instance's key""" + value = objpath.properties.get(k, None) + if isinstance(value, pywbem.CIMProperty): + return value.value + if value is not None: + return value logger.log_error('missing key "{}" in inst.props'.format(k)) - return op.path[k] if k in op.path else None - elif isinstance(op, pywbem.CIMInstanceName): - _get_key = lambda k: op[k] if k in op else None + return objpath.path[k] if k in objpath.path else None + elif isinstance(objpath, pywbem.CIMInstanceName): + _get_key = lambda k: objpath[k] if k in objpath else None else: - raise TypeError("op must be either CIMInstance or CIMInstanceName") + raise TypeError("objpath must be either CIMInstance" + "or CIMInstanceName") # parse and check arguments match_props = {} # args for match_pkg if _get_key('SoftwareElementID'): - m = re_nevra.match(_get_key('SoftwareElementID')) - if not m: + match = RE_NEVRA.match(_get_key('SoftwareElementID')) + if not match: raise pywbem.CIMError(pywbem.CIM_ERR_INVALID_PARAMETER, "SoftwareElementID could not be parsed.") for k in ('name', 'version', 'release', 'arch'): - mk = k if k not in ('version', 'release') else k[:3] - match_props[k] = m.group(mk) - if not m.group("epoch"): + mkey = k if k not in ('version', 'release') else k[:3] + match_props[k] = match.group(mkey) + if not match.group("epoch"): match_props["epoch"] = "0" else: for k in ('name', 'epoch', 'version', 'release', 'arch'): - ik = k if k != 'arch' else "architecture" - if _get_key(ik): match_props[k] = _get_key(ik) + ikey = k if k != 'arch' else "architecture" + if _get_key(ikey): + match_props[k] = _get_key(ikey) if not match_props: raise pywbem.CIMError(pywbem.CIM_ERR_FAILED, @@ -376,10 +352,10 @@ class SoftwarePackage: "Missing either Name or SoftwareElementID property.") # get available packages - pl = YumDB.getInstance(env).doPackageLists('all', showdups=True) + pkglist = YumDB.getInstance(env).doPackageLists('all', showdups=True) # NOTE: available ∩ installed = ∅ - exact,_,_ = yum.packages.parsePackages( - itertools.chain(pl.available, pl.installed), + exact, _, _ = yum.packages.parsePackages( + itertools.chain(pkglist.available, pkglist.installed), [match_props['name']]) exact = yum.misc.unique(exact) exact_orig = exact @@ -401,63 +377,57 @@ class SoftwarePackage: return exact[-1] # select highest version @staticmethod - def pkg2model_wrapper(namespace, classname): - """ - @return a function that transforms YumAvailablePackage object - to CIMInstanceName object - """ - - tos = pywbem.Uint16(get_target_operating_system()[0]) - - def pkg2model(env, pkg, keys_only=True, model=None): - """ - @param model if None, will be filled with data, otherwise - a new instance of CIMInstance or CIMObjectPath is created - """ - #if not isinstance(pkg, yum.rpmsack.RPMInstalledPackage): - #if not isinstance(pkg, yum.packages.YumHeaderPackage): - if not isinstance(pkg, yum.packages.YumAvailablePackage): - raise TypeError( - "pkg must be an instance of YumAvailablePackage") - if model is None: - model = pywbem.CIMInstanceName(classname, namespace=namespace) - if not keys_only: - model = pywbem.CIMInstance(classname, path=model) - if isinstance(model, pywbem.CIMInstance): - def _set_key(k, v): - model[k] = v - model.path[k] = v - else: - _set_key = model.__setitem__ - with YumDB.getInstance(env): - _set_key('Name', pkg.name) - _set_key('SoftwareElementID', pkg.nevra) - _set_key('SoftwareElementState', pywbem.Uint16(2 - if isinstance(pkg, yum.rpmsack.RPMInstalledPackage) - else 1)) - _set_key('TargetOperatingSystem', tos) - _set_key('Version', pkg.version) - if not keys_only: - model['Caption'] = pkg.summary - model['Description'] = pkg.description - if isinstance(pkg, yum.rpmsack.RPMInstalledPackage): - model['InstallDate'] = pywbem.CIMDateTime( - datetime.fromtimestamp(pkg.installtime)) - if pkg.vendor: - model['Manufacturer'] = pkg.vendor - model['Release'] = pkg.release - model['Epoch'] = pywbem.Uint16(pkg.epoch) - model["Architecture"] = pkg.arch - model['License'] = pkg.license - model['Group'] = pkg.group - model['Size'] = pywbem.Uint64(pkg.size) - return model - - return pkg2model + def pkg2model(env, pkg, keys_only=True, model=None): + """ + @param model if None, will be filled with data, otherwise + a new instance of CIMInstance or CIMObjectPath is created + """ + #if not isinstance(pkg, yum.rpmsack.RPMInstalledPackage): + #if not isinstance(pkg, yum.packages.YumHeaderPackage): + if not isinstance(pkg, yum.packages.YumAvailablePackage): + raise TypeError( + "pkg must be an instance of YumAvailablePackage") + if model is None: + model = pywbem.CIMInstanceName('LMI_SoftwarePackage', + namespace='root/cimv2') + if not keys_only: + model = pywbem.CIMInstance('LMI_SoftwarePackage', path=model) + if isinstance(model, pywbem.CIMInstance): + def _set_key(k, value): + """Sets the value of key property of cim instance""" + model[k] = value + model.path[k] = value #pylint: disable=E1103 + else: + _set_key = model.__setitem__ + with YumDB.getInstance(env): + _set_key('Name', pkg.name) + _set_key('SoftwareElementID', pkg.nevra) + _set_key('SoftwareElementState', pywbem.Uint16(2 + if isinstance(pkg, yum.rpmsack.RPMInstalledPackage) + else 1)) + _set_key('TargetOperatingSystem', + pywbem.Uint16(get_target_operating_system()[0])) + _set_key('Version', pkg.version) + if not keys_only: + model['Caption'] = pkg.summary + model['Description'] = pkg.description + if isinstance(pkg, yum.rpmsack.RPMInstalledPackage): + model['InstallDate'] = pywbem.CIMDateTime( + datetime.fromtimestamp(pkg.installtime)) + if pkg.vendor: + model['Manufacturer'] = pkg.vendor + model['Release'] = pkg.release + model['Epoch'] = pywbem.Uint16(pkg.epoch) + model["Architecture"] = pkg.arch + model['License'] = pkg.license + model['Group'] = pkg.group + model['Size'] = pywbem.Uint64(pkg.size) + return model class SoftwareFileCheck: """ Just a namespace for functions related to FileCheck provider. + TODO: make it a submodule """ passed_flags_descriptions = ( @@ -471,7 +441,7 @@ class SoftwareFileCheck: "User Ownership", "Group Ownership", "Modify Time") - checksumtype_str2num = dict((v, k) for (k, v) in + checksumtype_str2num = dict((val, k) for (k, val) in yum.constants.RPM_CHECKSUM_TYPES.items()) @staticmethod @@ -483,23 +453,36 @@ class SoftwareFileCheck: raise TypeError("pkg must be an instance of YumAvailablePackage") if isinstance(pkg, yum.rpmsack.RPMInstalledPackage): return pkg.hdr[rpm.RPMTAG_FILEDIGESTALGO] - with self: # ensure, that _yum is inited + with YumDB.getInstance(): # ensure, that _yum is inited return SoftwareFileCheck.checksumtype_str2pywbem( pkg.yumdb_info.checksum_type) @staticmethod def checksumtype_num2hash(csumt): + """ + @param csumt checksum type as a number obtained from package + @return hash function object corresponding to csumt + """ return getattr(hashlib, yum.constants.RPM_CHECKSUM_TYPES[csumt]) @staticmethod def checksumtype_str2pywbem(alg): + """ + @param alg is a name of algorithm used for checksum + @return pywbem number corresponding to given alg + """ try: res = SoftwareFileCheck.checksumtype_str2num[alg.lower()] - except KeyError: res = 0 + except KeyError: + res = 0 return pywbem.Uint16(res) @staticmethod - def filetype_str2pywbem(ft): + def filetype_str2pywbem(file_type): + """ + @param file_type is a name of file type obtained from pkg headers + @return pywbem number corresponding to thus file type + """ try: return pywbem.Uint16( { 'file' : 1 @@ -508,12 +491,16 @@ class SoftwareFileCheck: , 'fifo' : 4 , 'character device' : 5 , 'block device' : 6 - }[ft]) + }[file_type]) except KeyError: return pywbem.Uint16(0) @staticmethod def filetype_mode2pywbem(mode): + """ + @param mode is a raw file mode as integer + @return pywbem numeric value of file's type + """ for i, name in enumerate( ('REG', 'DIR', 'LNK', 'FIFO', 'CHR', 'BLK'), 1): if getattr(stat, 'S_IS' + name)(mode): @@ -524,10 +511,12 @@ class SoftwareFileCheck: def mode2pywbem_flags(mode): """ @param mode if None, file does not exist + @return list of integer flags describing file's access permissions """ - if mode is None: return None + if mode is None: + return None flags = [] - for i, c in enumerate(( + for i, flag in enumerate(( stat.S_IXOTH, stat.S_IWOTH, stat.S_IROTH, @@ -540,7 +529,7 @@ class SoftwareFileCheck: stat.S_ISVTX, stat.S_ISGID, stat.S_ISUID)): - if c & mode: + if flag & mode: flags.append(pywbem.Uint8(i)) return flags @@ -555,9 +544,10 @@ class SoftwareFileCheck: hashers = (hashers, ) buf = afile.read(blocksize) while len(buf) > 0: - for h in hashers: h.update(buf) + for hashfunc in hashers: + hashfunc.update(buf) buf = afile.read(blocksize) - return [ h.hexdigest() for h in hashers ] + return [ hashfunc.hexdigest() for hashfunc in hashers ] @staticmethod def compute_checksums(env, checksum_type, file_type, file_path): @@ -570,91 +560,86 @@ class SoftwareFileCheck: first one is always md5, the second one depends on checksum_type if file does not exists, (None, None) is returned """ - hashers = [hashlib.md5()] + hashers = [hashlib.md5()] #pylint: disable=E1101 if checksum_type != SoftwareFileCheck.checksumtype_str2num["md5"]: - hashers.append(SoftwareFileCheck.checksumtype_num2hash(checksum_type)()) + hashers.append(SoftwareFileCheck.checksumtype_num2hash( + checksum_type)()) if file_type != SoftwareFileCheck.filetype_str2pywbem('file'): rslts = ['0'*len(h.hexdigest()) for h in hashers] else: try: - with open(file_path, 'rb') as f: - rslts = SoftwareFileCheck.hashfile(f, hashers) - except (OSError, IOError) as e: + with open(file_path, 'rb') as fobj: + rslts = SoftwareFileCheck.hashfile(fobj, hashers) + except (OSError, IOError) as exc: env.get_logger().log_error("could not open file \"%s\"" - " for reading: %s" % (file_path, e)) + " for reading: %s" % (file_path, exc)) return None, None return (rslts[0], rslts[1] if len(rslts) > 1 else rslts[0]*2) @staticmethod - def object_path2yumcheck(env, op): + def object_path2yumcheck(env, objpath): """ @return instance of yum.packages._RPMVerifyPackage this object holds RPMInstalledPackage under its po attribute """ - if not isinstance(op, pywbem.CIMInstanceName): - raise TypeError("op must be instance of CIMInstanceName, " - "not \"%s\"" % op.__class__.__name__) - log = env.get_logger() - - tos = get_target_operating_system()[0] - if ( not op['Name'] or not op['SoftwareElementID'] - or not op['CheckID'] - or not op['CheckID'].endswith('#'+op['Name']) - or op['SoftwareElementID'].find(op['Version']) == -1): + if not isinstance(objpath, pywbem.CIMInstanceName): + raise TypeError("objpath must be instance of CIMInstanceName, " + "not \"%s\"" % objpath.__class__.__name__) + + if ( not objpath['Name'] or not objpath['SoftwareElementID'] + or not objpath['CheckID'] + or not objpath['CheckID'].endswith('#'+objpath['Name']) + or objpath['SoftwareElementID'].find(objpath['Version']) == -1): raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong keys.") - if op['SoftwareElementState'] not in ("2", 2): + if objpath['SoftwareElementState'] not in ("2", 2): raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Only \"Executable\" software element state supported") - if not check_target_operating_system(op['TargetOperatingSystem']): + if not check_target_operating_system(objpath['TargetOperatingSystem']): raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong target operating system.") - if not op['Name'] or not op['Version']: + if not objpath['Name'] or not objpath['Version']: raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, 'Both "Name" and "Version" must be given') - m = re_nevra.match(op['SoftwareElementID']) - if not m: + match = RE_NEVRA.match(objpath['SoftwareElementID']) + if not match: raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Wrong SotwareElementID. Expected valid nevra" " (name-epoch:version-release.arch).") - if op['Version'] != m.group('ver'): + if objpath['Version'] != match.group('ver'): raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "Version does not match version part in SoftwareElementID.") evra = "{}:{}-{}.{}".format(*( - (m.group(k) if k != "epoch" or m.group(k) else "0") + (match.group(k) if k != "epoch" or match.group(k) else "0") for k in ("epoch", 'ver', 'rel', 'arch'))) - with YumDB.getInstance(env) as yb: - pl = yb.doPackageLists('installed') - exact, matched, unmatched = yum.packages.parsePackages( - pl.installed, [m.group('name')]) - exact = yum.misc.unique(exact) - for pkg in exact: - if pkg.evra == evra: break - else: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + with YumDB.getInstance(env) as ydb: + pkglist = ydb.doPackageLists('installed') + exact, _, _ = yum.packages.parsePackages( + pkglist.installed, [match.group('name')]) + for pkg in yum.misc.unique(exact): + if pkg.evra != evra: + continue + vpkg = yum.packages._RPMVerifyPackage( + pkg, pkg.hdr.fiFromHeader(), + SoftwareFileCheck.pkg_checksum_type(pkg), [], True) + if not objpath['Name'] in vpkg: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "File not found in RPM package.") + return vpkg + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, "No matching package installed.") - fi = pkg.hdr.fiFromHeader() - vpkg = yum.packages._RPMVerifyPackage(pkg, fi, - SoftwareFileCheck.pkg_checksum_type(pkg), [], True) - if not op['Name'] in vpkg: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "File not found in RPM package.") - return vpkg - - """ - Named tuple to store results of rpm file check as pywbem values, - all results are in form: - (expected, reality) - where - expected is value from rpm package - reality is value obtained from installed file - None means, that value could not be obtained - except for "exists" and "md5_checksum" attributes, where "exists" - is boolean and md5_checksum is a string - for example: - file_check.file_type == (4, 3) - """ + # Named tuple to store results of rpm file check as pywbem values, + # all results are in form: + # (expected, reality) + # where + # expected is value from rpm package + # reality is value obtained from installed file + # None means, that value could not be obtained + # except for "exists" and "md5_checksum" attributes, where "exists" + # is boolean and md5_checksum is a string + # for example: + # file_check.file_type == (4, 3) FileCheck = collections.namedtuple('FileCheck', 'exists, md5_checksum, file_type, file_size, file_mode, ' 'file_checksum, device, link_target, user_id, group_id, ' @@ -688,7 +673,8 @@ class SoftwareFileCheck: else: fstat = os.lstat(vpf.filename) reality = { - "file_type" : SoftwareFileCheck.filetype_mode2pywbem(fstat.st_mode), + "file_type" : SoftwareFileCheck.filetype_mode2pywbem( + fstat.st_mode), "user_id" : pywbem.Uint32(fstat.st_uid), "group_id" : pywbem.Uint32(fstat.st_gid), "file_mode" : pywbem.Uint32(fstat.st_mode), @@ -696,8 +682,8 @@ class SoftwareFileCheck: "last_modification_time" : pywbem.Uint64(fstat.st_mtime) } reality["device"] = (pywbem.Uint64(fstat.st_dev) - if reality['file_type'] == SoftwareFileCheck.filetype_str2pywbem( - "device") else None) + if reality['file_type'] == + SoftwareFileCheck.filetype_str2pywbem("device") else None) reality["link_target"] = (os.readlink(vpf.filename) if os.path.islink(vpf.filename) else None) md5_checksum, checksum = SoftwareFileCheck.compute_checksums( @@ -708,102 +694,119 @@ class SoftwareFileCheck: return SoftwareFileCheck.FileCheck(**kwargs) @staticmethod - def filecheck_passed(fc): - if not isinstance(fc, SoftwareFileCheck.FileCheck): - raise TypeError("fc must be an instance of FileCheck") - return ( fc.exists - and all( v[0] == v[1] - for k, v in fc._asdict().items() if ( - isinstance(v, tuple) - and ( k != "last_modification_time" - or fc.file_type[0] == \ + def filecheck_passed(file_check): + """ + @return True if installed file passed all checks. + """ + if not isinstance(file_check, SoftwareFileCheck.FileCheck): + raise TypeError("file_check must be an instance of FileCheck") + return ( file_check.exists + and all( val[0] == val[1] + #pylint: disable=W0212 + for k, val in file_check._asdict().items() + if ( isinstance(val, tuple) + and ( k != "last_modification_time" + or file_check.file_type[0] == \ SoftwareFileCheck.filetype_str2pywbem("file") - )))) + )))) + + @staticmethod + def _filecheck2model_flags(file_check): + """ + @param file_check is an instance of FileCheck + @return pywbem value for PassedFlags property + """ + flags = [] + for k, value in file_check._asdict().items(): #pylint: disable=W0212 + if isinstance(value, tuple): + if ( k != "last_modification_time" + or file_check.file_type[0] == + SoftwareFileCheck.filetype_str2pywbem('file')): + # last_modification_time check is valid only for + # regular files + flag = file_check.exists and value[0] == value[1] + else: + flag = True + flags.append(flag) + elif isinstance(value, bool): + flags.append(value) + return flags @staticmethod - def filecheck_wrapper(namespace, classname): - - def _filecheck2model_flags(fc): - flags = [] - for k, v in fc._asdict().items(): - if isinstance(v, tuple): - if ( k != "last_modification_time" - or fc.file_type[0] == SoftwareFileCheck.filetype_str2pywbem( - 'file')): - # last_modification_time check is valid only for - # regular files - flag = fc.exists and v[0] == v[1] - else: - flag = True - flags.append(flag) - elif isinstance(v, bool): - flags.append(v) - return flags - - def filecheck2model(vpkg, fn, env, keys_only=True, - model=None, fc=None): - if not isinstance(vpkg, yum.packages._RPMVerifyPackage): - raise TypeError( - "vpkg must be an instance of _RPMVerifyPackage") - if not fn in vpkg: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "File \"%s\" not found among package files" % fn) - if model is None: - model = pywbem.CIMInstanceName(classname, namespace=namespace) - if not keys_only: - model = pywbem.CIMInstance(classname, path=model) - if fc is not None: - if not isinstance(fc, SoftwareFileCheck.FileCheck): - raise TypeError("fc must be an instance of FileCheck") - pkg = vpkg.po - vpf = vpkg._files[fn] - model['Name'] = vpf.filename - model['SoftwareElementID'] = pkg.nevra - model['SoftwareElementState'] = pywbem.Uint16(2) - model['TargetOperatingSystem'] = pywbem.Uint16( - get_target_operating_system()[0]) - model['Version'] = pkg.version - model['CheckID'] = '%s#%s' % (pkg.name, vpf.filename) + def _fill_non_key_values(env, model, pkg, vpf, file_check): + """ + Fills a non key values into instance of SoftwareFileCheck. + """ + model['FileName'] = os.path.basename(vpf.filename) + model['FileChecksumType'] = csumt = \ + pywbem.Uint16(SoftwareFileCheck.pkg_checksum_type(pkg)) + if file_check is None: + file_check = SoftwareFileCheck.test_file(env, csumt, vpf) + for mattr, fattr in ( + ('FileType', 'file_type'), + ('FileUserID', 'user_id'), + ('FileGroupID', 'group_id'), + ('FileMode', 'file_mode'), + ('LastModificationTime', 'last_modification_time'), + ('FileSize', 'file_size'), + ('LinkTarget', 'link_target'), + ('FileChecksum', 'file_checksum')): + exp, rea = getattr(file_check, fattr) + if exp is not None: + model['Expected' + mattr] = exp + if rea is not None: + model[mattr] = rea + model['ExpectedFileModeFlags'] = \ + SoftwareFileCheck.mode2pywbem_flags(file_check.file_mode[0]) + if file_check.exists: + model['FileModeFlags'] = \ + SoftwareFileCheck.mode2pywbem_flags(file_check.file_mode[1]) + model['FileExists'] = file_check.exists + if file_check.md5_checksum is not None: + model['MD5Checksum'] = file_check.md5_checksum + model['PassedFlags'] = SoftwareFileCheck._filecheck2model_flags( + file_check) + model['PassedFlagsDescriptions'] = list( + SoftwareFileCheck.passed_flags_descriptions) + + @staticmethod + def filecheck2model(vpkg, file_name, env, keys_only=True, + model=None, file_check=None): + """ + @param vpkg is an instance of yum.packages_RPMVerifyPackage + @param file_name a absolute file path contained in package + @param keys_only if True, then only key values will be filed + @param model if given, then this instance will be modified and + returned + @param file_check if not given, it will be computed + @return instance of LMI_SoftwareFileCheck class with all desired + values filed + """ + if not isinstance(vpkg, yum.packages._RPMVerifyPackage): + raise TypeError( + "vpkg must be an instance of _RPMVerifyPackage") + if not file_name in vpkg: + raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, + "File \"%s\" not found among package files" % file_name) + if model is None: + model = pywbem.CIMInstanceName("LMI_SoftwareFileCheck", + namespace="root/cimv2") if not keys_only: - #model['Caption'] = '' # TODO - #model['CheckMode'] = bool() # TODO - #model['CRC1'] = pywbem.Uint32() # TODO - #model['CRC2'] = pywbem.Uint32() # TODO - #model['CreateTimeStamp'] = pywbem.CIMDateTime() # TODO - #model['Description'] = '' # TODO - #model['ElementName'] = '' # TODO - #model['InstanceID'] = '' # TODO - model['FileName'] = os.path.basename(vpf.filename) - model['FileChecksumType'] = csumt = \ - pywbem.Uint16(SoftwareFileCheck.pkg_checksum_type(pkg)) - if fc is None: - fc = SoftwareFileCheck.test_file(env, csumt, vpf) - for mattr, fattr in ( - ('FileType', 'file_type'), - ('FileUserID', 'user_id'), - ('FileGroupID', 'group_id'), - ('FileMode', 'file_mode'), - ('LastModificationTime', 'last_modification_time'), - ('FileSize', 'file_size'), - ('LinkTarget', 'link_target'), - ('FileChecksum', 'file_checksum')): - exp, rea = getattr(fc, fattr) - if exp is not None: - model['Expected' + mattr] = exp - if rea is not None: - model[mattr] = rea - model['ExpectedFileModeFlags'] = \ - SoftwareFileCheck.mode2pywbem_flags(fc.file_mode[0]) - if fc.exists: - model['FileModeFlags'] = SoftwareFileCheck.mode2pywbem_flags( - fc.file_mode[1]) - model['FileExists'] = fc.exists - if fc.md5_checksum is not None: - model['MD5Checksum'] = fc.md5_checksum - model['PassedFlags'] = _filecheck2model_flags(fc) - model['PassedFlagsDescriptions'] = list( - SoftwareFileCheck.passed_flags_descriptions) - return model - - return filecheck2model + model = pywbem.CIMInstance("LMI_SoftwareFileCheck", path=model) + if file_check is not None: + if not isinstance(file_check, SoftwareFileCheck.FileCheck): + raise TypeError("file_check must be an instance of FileCheck") + pkg = vpkg.po + vpf = vpkg._files[file_name] + model['Name'] = vpf.filename + model['SoftwareElementID'] = pkg.nevra + model['SoftwareElementState'] = pywbem.Uint16(2) + model['TargetOperatingSystem'] = pywbem.Uint16( + get_target_operating_system()[0]) + model['Version'] = pkg.version + model['CheckID'] = '%s#%s' % (pkg.name, vpf.filename) + if not keys_only: + SoftwareFileCheck._fill_non_key_values( + env, model, pkg, vpf, file_check) + return model |