From bcb2525736d91ff912b08fce8a7f503b034c2ca1 Mon Sep 17 00:00:00 2001 From: Michal Minar Date: Sat, 13 Oct 2012 21:56:03 +0200 Subject: added a cli tool for testing software provider --- src/software/cli/software.py | 271 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 271 insertions(+) create mode 100755 src/software/cli/software.py diff --git a/src/software/cli/software.py b/src/software/cli/software.py new file mode 100755 index 0000000..a8aba16 --- /dev/null +++ b/src/software/cli/software.py @@ -0,0 +1,271 @@ +#!/usr/bin/env python + +import argparse +from collections import defaultdict +import os +import pywbem +import re +import socket +import subprocess +import sys +import unittest + +re_nevra = re.compile(r'^(?P.+)-(?P(?P\d+):(?P[^-]+)' + r'-(?P.+)\.(?P[^.]+))$') + +cim_error2text = defaultdict(lambda: "OTHER_ERROR", { + 1 : "FAILED", + 2 : "ACCESS_DENIED", + 3 : "INVALID_NAMESPACE", + 4 : "INVALID_PARAMETER", + 5 : "INVALID_CLASS", + 6 : "NOT_FOUND", + 7 : "NOT_SUPPORTED", + 8 : "CLASS_HAS_CHILDREN", + 9 : "CLASS_HAS_INSTANCES", + 10 : "INVALID_SUPERCLASS", + 11 : "ALREADY_EXISTS", + 12 : "NO_SUCH_PROPERTY", + 13 : "TYPE_MISMATCH", + 14 : "QUERY_LANGUAGE_NOT_SUPPORTED", + 15 : "INVALID_QUERY", + 16 : "METHOD_NOT_AVAILABLE", + 17 : "METHOD_NOT_FOUND" +}) + +class NotFound(Exception): + def __init__(self, package, details=None): + msg = 'Package "%s" not installed!'%package + if details is not None: + msg += ' : '+details + Exception.__init__(self, msg) + +HOSTNAME = None +def get_host_name(): + global HOSTNAME + if HOSTNAME is None: + HOSTNAME = socket.gethostname() + return HOSTNAME + +def make_nevra(name, epoch, ver, rel, arch, with_epoch='NOT_ZERO'): + """ + @param with_epoch may be one of: + "NOT_ZERO" - include epoch only if it's not zero + "ALWAYS" - include epoch always + "NEVER" - do not include epoch at all + """ + estr = '' + if with_epoch.lower() == "always": + estr = epoch + elif with_epoch.lower() == "not_zero": + if epoch != "0": + estr = epoch + if len(estr): estr += ":" + return "%s-%s%s-%s.%s" % (name, estr, ver, rel, arch) + +def make_pkg_path(name, epoch, ver, rel, arch): + return pywbem.CIMInstanceName( + classname="LMI_SoftwarePackage", namespace="root/cimv2", + keybindings={ + "Name" : name, + "SoftwareElementID" : make_nevra( + name, epoch, ver, rel, arch, "ALWAYS"), + "SoftwareElementState" : pywbem.Uint16(2), + "TargetOperatingSystem" : pywbem.Uint16(36), + "Version" : ver }) + +def make_inst_pkg_path(*args): + op = pywbem.CIMInstanceName(classname="LMI_SoftwareInstalledPackage", + namespace='root/cimv2') + system_op = pywbem.CIMInstanceName( + classname="CIM_ComputerSystem", namespace="root/cimv2", + keybindings={ + "CreationClassName" : "CIM_ComputerSystem", + "Name" : get_host_name() }) + op["Software"] = make_pkg_path(*args) + op["System"] = system_op + return op + +def get_instance_name(package, is_nevra=False, installed=True): + m = re_nevra.match(package) + if not m and is_nevra: + raise ValueError('Expected a valid nevra!.') + if not m and not installed: + raise ValueError('You must supply a valid nevra!') + if not m: # given only a name of package + # try to enumerate installed packages and find a correct one + inames = conn.EnumerateInstanceNames( + ClassName='LMI_SoftwareInstalledPackage', + namespace='root/cimv2') + for iname in inames: + if iname['Software']['Name'] == package: + break + else: + raise NotFound(package) + else: # nevra given + try: + args = [m.group(k) for k in 'name', 'epoch', 'ver', 'rel', 'arch'] + if not args[1]: # epoch not given + args[1] = '0' + func = (make_pkg_path, make_inst_pkg_path)[1 if installed else 0] + inst = conn.GetInstance(InstanceName=func(*args), LocalOnly=False) + iname = inst.path + except pywbem.CIMError as e: + if pywbem.CIM_ERR_NOT_FOUND: + raise NotFound(package, e.args[1]) + raise + return iname + +def install(conn, nevra): + iname = get_instance_name(nevra, True, False) + (rval, oparms) = conn.InvokeMethod(MethodName='Install', ObjectName=iname) + print (rval, oparms) + +def update(conn, package, epoch=None, version=None, release=None): + iname = get_instance_name(package) + kwargs = dict(MethodName='Update', ObjectName=iname) + for name, val in ( ('Epoch', epoch), ('Version', version) + , ('Release', release)): + if val is None: continue + kwargs[name] = str(val) + (rval, oparms) = conn.InvokeMethod(**kwargs) + print (rval, oparms) + +def remove(conn, package): + iname = get_instance_name(package) + conn.DeleteInstance(iname) + +def verify(conn, package): + iname = get_instance_name(package) + (rval, oparms) = conn.InvokeMethod(MethodName='CheckIntegrity', + ObjectName=iname) + if rval == 0: + print "Passed" + elif rval == 2: + print "Error" + else: # Not Passed + print "Not Passed:" + for f in oparms["Failed"]: + inst = conn.GetInstance(InstanceName=f, LocalOnly=False) + print " %s"%inst["Name"] + if not inst['FileExists']: + print " - does not exist" + else: + for arg in ( 'Checksum', 'LinkTarget', 'FileGroupID' + , 'FileUserID', 'FileMode', 'FileSize' + , 'FileType', 'LastModificationTime'): + if inst['Expected'+arg] != inst[arg]: + print " - %s\t: %s != %s"%(arg, + inst['Expected'+arg], inst[arg]) + +def list_installed(conn): + inames = conn.EnumerateInstanceNames( + ClassName='LMI_SoftwareInstalledPackage', + namespace='root/cimv2') + for nevra in sorted((i['Software']['SoftwareElementID'] for i in inames)): + print nevra + +def list_files(conn, package): + iname = get_instance_name(package) + # TODO: find out, why passing role='Check' failes + inames = conn.ReferenceNames(ObjectName=iname['Software'], + ResultClass='LMI_SoftwarePackageChecks') + for i in inames: + print i['Check']['Name'] + +if __name__ == '__main__': + parser = argparse.ArgumentParser(prog='software', + description=("CLI tool for testing cura software providers." + " With this tool you are able to install, update," + " remove and verify particular package."), + conflict_handler="resolve") + parser.add_argument('--url', + help="Specify schema, hostname and port of broker in one argument." + " For user and password, please use provided options.") + parser.add_argument('-p', '--port', type=int, default=5988, + help="Port of cimom broker.") + parser.add_argument('-h', '--hostname', + default='localhost', help="Hostname of cimom broker.") + parser.add_argument('-s', '--schema', choices=('http', 'https'), + default='http') + parser.add_argument('-u', '--user', default='', + help="Under which user to authenticate.") + parser.add_argument('--password', default='', + help="User password.") + parser.add_argument('-d', '--debug', action='store_true', default=False, + help="Print debugging informations.") + subpars = parser.add_subparsers(help="Action to make on cura providers.") + + parse_install = subpars.add_parser('install', + help="Install specific available package.") + parse_install.add_argument('nevra', help="Name, epoch, version, release" + " and architecture of package to install given as:\n" + " name-epoch:version-release.arch") + parse_install.set_defaults(command='install') + + parse_update = subpars.add_parser('update', + help="Update a package to specific evra or to the newest available.") + parse_update.add_argument('package', + help="Name or nevra of installed package.") + parse_update.add_argument('-e', '--epoch', + type=int, help="Filter available packages for update by epoch.") + parse_update.add_argument('-v', '--version', + help="Filter available packages for update by version.") + parse_update.add_argument('-r', '--release', + help="Filter available packages for update by release.") + parse_update.set_defaults(command='update') + + parse_remove = subpars.add_parser('remove', + help="Remove installed package.") + parse_remove.add_argument('package', + help="Name or nevra of installed package.") + parse_remove.set_defaults(command='remove') + + parse_verify = subpars.add_parser('verify', + help="Verify installed package.") + parse_verify.add_argument('package', + help="Name or nevra of installed package to verify.") + parse_verify.set_defaults(command='verify') + + parse_list = subpars.add_parser('list', + help="List installed packages.") + parse_list.set_defaults(command='list') + + parse_files = subpars.add_parser('list-files', + help="List files of installed package.") + parse_files.add_argument('package', + help="Name or nevra of installed package.") + parse_files.set_defaults(command='list-files') + + args = parser.parse_args() + + if args.url is not None: + url = args.url + else: + url = '%s://%s:%d' % (args.schema, args.hostname, args.port) + HOSTNAME = re.match('^https?://([^:]+)', url).group(1) + auth = (args.user, args.password) + if args.debug: + sys.stderr.write('url:\t%s\n'%url) + conn = pywbem.WBEMConnection(url, auth) + + func, attrs = \ + { 'install' : (install, ('nevra',)) + , 'update' : (update, ('package', 'epoch', 'version', 'release')) + , 'remove' : (remove, ('package',)) + , 'verify' : (verify, ('package',)) + , 'list' : (list_installed, tuple()) + , 'list-files' : (list_files, ('package',)) + }[args.command] + try: + func(conn, *(getattr(args, a) for a in attrs)) + sys.exit(0) + except pywbem.CIMError as e: + sys.stderr.write('Failed: %d (%s) - %s\n' % ( + e.args[0], cim_error2text[e.args[0]], + e.args[1].replace('
', '\n'))) + sys.exit(1) + except NotFound as e: + sys.stderr.write(str(e) + '\n') + sys.exit(1) + -- cgit