#!/usr/bin/env python import argparse from collections import defaultdict import os import pywbem import re import socket import subprocess import sys import unittest re_nevra = re.compile(r'^(?P.+)-(?P(?P\d+):(?P[^-]+)' r'-(?P.+)\.(?P[^.]+))$') cim_error2text = defaultdict(lambda: "OTHER_ERROR", { 1 : "FAILED", 2 : "ACCESS_DENIED", 3 : "INVALID_NAMESPACE", 4 : "INVALID_PARAMETER", 5 : "INVALID_CLASS", 6 : "NOT_FOUND", 7 : "NOT_SUPPORTED", 8 : "CLASS_HAS_CHILDREN", 9 : "CLASS_HAS_INSTANCES", 10 : "INVALID_SUPERCLASS", 11 : "ALREADY_EXISTS", 12 : "NO_SUCH_PROPERTY", 13 : "TYPE_MISMATCH", 14 : "QUERY_LANGUAGE_NOT_SUPPORTED", 15 : "INVALID_QUERY", 16 : "METHOD_NOT_AVAILABLE", 17 : "METHOD_NOT_FOUND" }) class NotFound(Exception): def __init__(self, package, details=None): msg = 'Package "%s" not installed!'%package if details is not None: msg += ' : '+details Exception.__init__(self, msg) HOSTNAME = None def get_host_name(): global HOSTNAME if HOSTNAME is None: HOSTNAME = socket.gethostname() return HOSTNAME def make_nevra(name, epoch, ver, rel, arch, with_epoch='NOT_ZERO'): """ @param with_epoch may be one of: "NOT_ZERO" - include epoch only if it's not zero "ALWAYS" - include epoch always "NEVER" - do not include epoch at all """ estr = '' if with_epoch.lower() == "always": estr = epoch elif with_epoch.lower() == "not_zero": if epoch != "0": estr = epoch if len(estr): estr += ":" return "%s-%s%s-%s.%s" % (name, estr, ver, rel, arch) def make_pkg_path(name, epoch, ver, rel, arch): return pywbem.CIMInstanceName( classname="LMI_SoftwarePackage", namespace="root/cimv2", keybindings={ "Name" : name, "SoftwareElementID" : make_nevra( name, epoch, ver, rel, arch, "ALWAYS"), "SoftwareElementState" : pywbem.Uint16(2), "TargetOperatingSystem" : pywbem.Uint16(36), "Version" : ver }) def make_inst_pkg_path(*args): op = pywbem.CIMInstanceName(classname="LMI_SoftwareInstalledPackage", namespace='root/cimv2') system_op = pywbem.CIMInstanceName( classname="CIM_ComputerSystem", namespace="root/cimv2", keybindings={ "CreationClassName" : "CIM_ComputerSystem", "Name" : get_host_name() }) op["Software"] = make_pkg_path(*args) op["System"] = system_op return op def get_instance_name(package, is_nevra=False, installed=True): m = re_nevra.match(package) if not m and is_nevra: raise ValueError('Expected a valid nevra!.') if not m and not installed: raise ValueError('You must supply a valid nevra!') if not m: # given only a name of package # try to enumerate installed packages and find a correct one inames = conn.EnumerateInstanceNames( ClassName='LMI_SoftwareInstalledPackage', namespace='root/cimv2') for iname in inames: if iname['Software']['Name'] == package: break else: raise NotFound(package) else: # nevra given try: args = [m.group(k) for k in 'name', 'epoch', 'ver', 'rel', 'arch'] if not args[1]: # epoch not given args[1] = '0' func = (make_pkg_path, make_inst_pkg_path)[1 if installed else 0] inst = conn.GetInstance(InstanceName=func(*args), LocalOnly=False) iname = inst.path except pywbem.CIMError as e: if pywbem.CIM_ERR_NOT_FOUND: raise NotFound(package, e.args[1]) raise return iname def install(conn, nevra): iname = get_instance_name(nevra, True, False) (rval, oparms) = conn.InvokeMethod(MethodName='Install', ObjectName=iname) print (rval, oparms) def update(conn, package, epoch=None, version=None, release=None): iname = get_instance_name(package) kwargs = dict(MethodName='Update', ObjectName=iname) for name, val in ( ('Epoch', epoch), ('Version', version) , ('Release', release)): if val is None: continue kwargs[name] = str(val) (rval, oparms) = conn.InvokeMethod(**kwargs) print (rval, oparms) def remove(conn, package): iname = get_instance_name(package) conn.DeleteInstance(iname) def verify(conn, package): iname = get_instance_name(package) (rval, oparms) = conn.InvokeMethod(MethodName='CheckIntegrity', ObjectName=iname) if rval == 0: print "Passed" elif rval == 2: print "Error" else: # Not Passed print "Not Passed:" for f in oparms["Failed"]: inst = conn.GetInstance(InstanceName=f, LocalOnly=False) print " %s"%inst["Name"] if not inst['FileExists']: print " - does not exist" else: for arg in ( 'Checksum', 'LinkTarget', 'FileGroupID' , 'FileUserID', 'FileMode', 'FileSize' , 'FileType', 'LastModificationTime'): if inst['Expected'+arg] != inst[arg]: print " - %s\t: %s != %s"%(arg, inst['Expected'+arg], inst[arg]) def list_installed(conn): inames = conn.EnumerateInstanceNames( ClassName='LMI_SoftwareInstalledPackage', namespace='root/cimv2') for nevra in sorted((i['Software']['SoftwareElementID'] for i in inames)): print nevra def list_files(conn, package): iname = get_instance_name(package) # TODO: find out, why passing role='Check' failes inames = conn.ReferenceNames(ObjectName=iname['Software'], ResultClass='LMI_SoftwarePackageChecks') for i in inames: print i['Check']['Name'] if __name__ == '__main__': parser = argparse.ArgumentParser(prog='software', description=("CLI tool for testing cura software providers." " With this tool you are able to install, update," " remove and verify particular package."), conflict_handler="resolve") parser.add_argument('--url', help="Specify schema, hostname and port of broker in one argument." " For user and password, please use provided options.") parser.add_argument('-p', '--port', type=int, default=5988, help="Port of cimom broker.") parser.add_argument('-h', '--hostname', default='localhost', help="Hostname of cimom broker.") parser.add_argument('-s', '--schema', choices=('http', 'https'), default='http') parser.add_argument('-u', '--user', default='', help="Under which user to authenticate.") parser.add_argument('--password', default='', help="User password.") parser.add_argument('-d', '--debug', action='store_true', default=False, help="Print debugging informations.") subpars = parser.add_subparsers(help="Action to make on cura providers.") parse_install = subpars.add_parser('install', help="Install specific available package.") parse_install.add_argument('nevra', help="Name, epoch, version, release" " and architecture of package to install given as:\n" " name-epoch:version-release.arch") parse_install.set_defaults(command='install') parse_update = subpars.add_parser('update', help="Update a package to specific evra or to the newest available.") parse_update.add_argument('package', help="Name or nevra of installed package.") parse_update.add_argument('-e', '--epoch', type=int, help="Filter available packages for update by epoch.") parse_update.add_argument('-v', '--version', help="Filter available packages for update by version.") parse_update.add_argument('-r', '--release', help="Filter available packages for update by release.") parse_update.set_defaults(command='update') parse_remove = subpars.add_parser('remove', help="Remove installed package.") parse_remove.add_argument('package', help="Name or nevra of installed package.") parse_remove.set_defaults(command='remove') parse_verify = subpars.add_parser('verify', help="Verify installed package.") parse_verify.add_argument('package', help="Name or nevra of installed package to verify.") parse_verify.set_defaults(command='verify') parse_list = subpars.add_parser('list', help="List installed packages.") parse_list.set_defaults(command='list') parse_files = subpars.add_parser('list-files', help="List files of installed package.") parse_files.add_argument('package', help="Name or nevra of installed package.") parse_files.set_defaults(command='list-files') args = parser.parse_args() if args.url is not None: url = args.url else: url = '%s://%s:%d' % (args.schema, args.hostname, args.port) HOSTNAME = re.match('^https?://([^:]+)', url).group(1) auth = (args.user, args.password) if args.debug: sys.stderr.write('url:\t%s\n'%url) conn = pywbem.WBEMConnection(url, auth) func, attrs = \ { 'install' : (install, ('nevra',)) , 'update' : (update, ('package', 'epoch', 'version', 'release')) , 'remove' : (remove, ('package',)) , 'verify' : (verify, ('package',)) , 'list' : (list_installed, tuple()) , 'list-files' : (list_files, ('package',)) }[args.command] try: func(conn, *(getattr(args, a) for a in attrs)) sys.exit(0) except pywbem.CIMError as e: sys.stderr.write('Failed: %d (%s) - %s\n' % ( e.args[0], cim_error2text[e.args[0]], e.args[1].replace('
', '\n'))) sys.exit(1) except NotFound as e: sys.stderr.write(str(e) + '\n') sys.exit(1)