diff options
Diffstat (limited to 'src/software/cli/software.py')
-rwxr-xr-x | src/software/cli/software.py | 282 |
1 files changed, 116 insertions, 166 deletions
diff --git a/src/software/cli/software.py b/src/software/cli/software.py index 5368e5f..14c26df 100755 --- a/src/software/cli/software.py +++ b/src/software/cli/software.py @@ -19,20 +19,21 @@ # Authors: Michal Minar <miminar@redhat.com> # +""" +Command line tool for simple software management with OpenLMI CIM software +providers. +""" + import argparse from collections import defaultdict -import os import pywbem -import re import socket -import subprocess import sys -import unittest -re_nevra = re.compile(r'^(?P<name>.+)-(?P<evra>(?P<epoch>\d+):(?P<ver>[^-]+)' - r'-(?P<rel>.+)\.(?P<arch>[^.]+))$') +from openlmi.software import util +from openlmi.software.core import SystemSoftwareCollection -cim_error2text = defaultdict(lambda: "OTHER_ERROR", { +CIM_ERROR2TEXT = defaultdict(lambda: "OTHER_ERROR", { 1 : "FAILED", 2 : "ACCESS_DENIED", 3 : "INVALID_NAMESPACE", @@ -53,146 +54,74 @@ cim_error2text = defaultdict(lambda: "OTHER_ERROR", { }) class NotFound(Exception): + """ + Exception raised, when desired package could not be found. + """ def __init__(self, package, details=None): - msg = 'Package "%s" not installed!'%package + msg = 'Package "%s" not installed!' % package if details is not None: msg += ' : '+details Exception.__init__(self, msg) -HOSTNAME = None def get_host_name(): - global HOSTNAME - if HOSTNAME is None: - HOSTNAME = socket.gethostname() - return HOSTNAME - -def make_nevra(name, epoch, ver, rel, arch, with_epoch='NOT_ZERO'): """ - @param with_epoch may be one of: - "NOT_ZERO" - include epoch only if it's not zero - "ALWAYS" - include epoch always - "NEVER" - do not include epoch at all + @return computer host name """ - estr = '' - if with_epoch.lower() == "always": - estr = epoch - elif with_epoch.lower() == "not_zero": - if epoch != "0": - estr = epoch - if len(estr): estr += ":" - return "%s-%s%s-%s.%s" % (name, estr, ver, rel, arch) + if not hasattr(get_host_name, '_hostname'): + get_host_name._hostname = socket.gethostname() #pylint: disable=W0212 + return get_host_name._hostname #pylint: disable=W0212 def make_pkg_path(name, epoch, ver, rel, arch): + """ + @return instance name for LMI_SoftwareIdentity + """ return pywbem.CIMInstanceName( - classname="LMI_SoftwarePackage", namespace="root/cimv2", + classname="LMI_SoftwareIdentity", namespace="root/cimv2", keybindings={ - "Name" : name, - "SoftwareElementID" : make_nevra( - name, epoch, ver, rel, arch, "ALWAYS"), - "SoftwareElementState" : pywbem.Uint16(2), - "TargetOperatingSystem" : pywbem.Uint16(36), - "Version" : ver }) - -def make_inst_pkg_path(*args): - op = pywbem.CIMInstanceName(classname="LMI_SoftwareInstalledPackage", - namespace='root/cimv2') - system_op = pywbem.CIMInstanceName( - classname="CIM_ComputerSystem", namespace="root/cimv2", - keybindings={ - "CreationClassName" : "CIM_ComputerSystem", - "Name" : get_host_name() }) - op["Software"] = make_pkg_path(*args) - op["System"] = system_op - return op - -def get_instance_name(package, is_nevra=False, installed=True): - m = re_nevra.match(package) - if not m and is_nevra: - raise ValueError('Expected a valid nevra!.') - if not m and not installed: - raise ValueError('You must supply a valid nevra!') - if not m: # given only a name of package - # try to enumerate installed packages and find a correct one - inames = conn.EnumerateInstanceNames( - ClassName='LMI_SoftwareInstalledPackage', - namespace='root/cimv2') - for iname in inames: - if iname['Software']['Name'] == package: - break - else: - raise NotFound(package) - else: # nevra given - try: - args = [m.group(k) for k in 'name', 'epoch', 'ver', 'rel', 'arch'] - if not args[1]: # epoch not given - args[1] = '0' - func = (make_pkg_path, make_inst_pkg_path)[1 if installed else 0] - inst = conn.GetInstance(InstanceName=func(*args), LocalOnly=False) - iname = inst.path - except pywbem.CIMError as e: - if pywbem.CIM_ERR_NOT_FOUND: - raise NotFound(package, e.args[1]) - raise - return iname - -def install(conn, nevra): - iname = get_instance_name(nevra, True, False) - (rval, oparms) = conn.InvokeMethod(MethodName='Install', ObjectName=iname) - print (rval, oparms) - -def update(conn, package, epoch=None, version=None, release=None): - iname = get_instance_name(package) - kwargs = dict(MethodName='Update', ObjectName=iname) - for name, val in ( ('Epoch', epoch), ('Version', version) - , ('Release', release)): - if val is None: continue - kwargs[name] = str(val) - (rval, oparms) = conn.InvokeMethod(**kwargs) - print (rval, oparms) - -def remove(conn, package): - iname = get_instance_name(package) - conn.DeleteInstance(iname) - -def verify(conn, package): - iname = get_instance_name(package) - (rval, oparms) = conn.InvokeMethod(MethodName='CheckIntegrity', - ObjectName=iname) - if rval == 0: - print "Passed" - elif rval == 2: - print "Error" - else: # Not Passed - print "Not Passed:" - for f in oparms["Failed"]: - inst = conn.GetInstance(InstanceName=f, LocalOnly=False) - print " %s"%inst["Name"] - if not inst['FileExists']: - print " - does not exist" - else: - for arg in ( 'Checksum', 'LinkTarget', 'FileGroupID' - , 'FileUserID', 'FileMode', 'FileSize' - , 'FileType', 'LastModificationTime'): - if inst['Expected'+arg] != inst[arg]: - print " - %s\t: %s != %s"%(arg, - inst['Expected'+arg], inst[arg]) - -def list_installed(conn): - inames = conn.EnumerateInstanceNames( - ClassName='LMI_SoftwareInstalledPackage', - namespace='root/cimv2') - for nevra in sorted((i['Software']['SoftwareElementID'] for i in inames)): - print nevra - -def list_files(conn, package): - iname = get_instance_name(package) - # TODO: find out, why passing role='Check' failes - inames = conn.ReferenceNames(ObjectName=iname['Software'], - ResultClass='LMI_SoftwarePackageChecks') - for i in inames: - print i['Check']['Name'] - -if __name__ == '__main__': + "InstanceID" : util.make_nevra( + name, epoch, ver, rel, arch, "ALWAYS")}) + +def install(_conn, _nevra): + """Install package by nevra.""" + raise NotImplementedError("Installation is not yet supported!") + +def update(_conn, _package, _epoch=None, _version=None, _release=None): + """Update to particular evr of package.""" + raise NotImplementedError("Update of package is not yet supported!") + +def remove(_conn, _package): + """Remove installed package by its name.""" + raise NotImplementedError("Removal is not yet supported!") + +def verify(_conn, _package): + """Verity installed package by its name.""" + raise NotImplementedError("Verification is not yet supported!") + +def list_available(conn): + """List nevra strings of available packages.""" + inames = conn.AssociatorNames( + ObjectName=SystemSoftwareCollection.get_path(), + ResultClass='LMI_SoftwareIdentity', + AssocClass="LMI_MemberOfSoftwareCollection", + Role="Collection", + ResultRole="Member") + for nevra in (i['InstanceID'] for i in inames): + print nevra[len("LMI:PKG:"):] + +def list_installed(_conn): + """List nevra strings of installed packages.""" + raise NotImplementedError( + "Listing of installed packages is not yet supported!") + +def list_files(_conn, _package): + """List files of installed package.""" + raise NotImplementedError("Listing of package files is not yet supported!") + +def parse_args(): + """ + Parse command line arguments and handle related errors. + @return Namespace object + """ parser = argparse.ArgumentParser(prog='software', description=("CLI tool for testing OpenLMI software providers." " With this tool you are able to install, update," @@ -201,12 +130,12 @@ if __name__ == '__main__': parser.add_argument('--url', help="Specify schema, hostname and port of broker in one argument." " For user and password, please use provided options.") - parser.add_argument('-p', '--port', type=int, default=5988, - help="Port of cimom broker.") + parser.add_argument('-p', '--port', type=int, default=5989, + help="Port of cimom broker. Default is %(default)d.") parser.add_argument('-h', '--hostname', default='localhost', help="Hostname of cimom broker.") parser.add_argument('-s', '--schema', choices=('http', 'https'), - default='http') + default='https', help="Schema part of url (default is %(default)s)") parser.add_argument('-u', '--user', default='', help="Under which user to authenticate.") parser.add_argument('--password', default='', @@ -247,44 +176,65 @@ if __name__ == '__main__': parse_verify.set_defaults(command='verify') parse_list = subpars.add_parser('list', - help="List installed packages.") + help="List various information depending on next argument." + " See \"%(prog)s list --help\".") parse_list.set_defaults(command='list') - parse_files = subpars.add_parser('list-files', - help="List files of installed package.") - parse_files.add_argument('package', + list_subpars = parse_list.add_subparsers(help="What should be listed.") + parse_list_available = list_subpars.add_parser("available", + help="List available packages.") + parse_list_available.set_defaults(list_kind='available') + parse_list_installed = list_subpars.add_parser("installed", + help="List installed packages.") + parse_list_installed.set_defaults(list_kind='installed') + parse_list_files = list_subpars.add_parser("files", + help="List files of installed package.") + parse_list_files.set_defaults(list_kind='files') + parse_list_files.add_argument('package', help="Name or nevra of installed package.") - parse_files.set_defaults(command='list-files') args = parser.parse_args() - if args.url is not None: - url = args.url - else: - url = '%s://%s:%d' % (args.schema, args.hostname, args.port) - HOSTNAME = re.match('^https?://([^:]+)', url).group(1) + if args.url is None: + args.url = '%s://%s:%d' % (args.schema, args.hostname, args.port) + + return args + +def main(args): + """ + Main functionality. + @return return code of script + """ auth = (args.user, args.password) if args.debug: - sys.stderr.write('url:\t%s\n'%url) - conn = pywbem.WBEMConnection(url, auth) + sys.stderr.write('url:\t%s\n'%args.url) + conn = pywbem.WBEMConnection(args.url, auth) - func, attrs = \ - { 'install' : (install, ('nevra',)) - , 'update' : (update, ('package', 'epoch', 'version', 'release')) - , 'remove' : (remove, ('package',)) - , 'verify' : (verify, ('package',)) - , 'list' : (list_installed, tuple()) - , 'list-files' : (list_files, ('package',)) - }[args.command] + if args.command == 'list': + func, attrs = \ + { 'available' : (list_available, tuple()) + , 'installed' : (list_installed, tuple()) + , 'files' : (list_files, ('package')) + }[args.list_kind] + else: + func, attrs = \ + { 'install' : (install, ('nevra',)) + , 'update' : (update, ('package', 'epoch', 'version', 'release')) + , 'remove' : (remove, ('package',)) + , 'verify' : (verify, ('package',)) + }[args.command] try: func(conn, *(getattr(args, a) for a in attrs)) - sys.exit(0) - except pywbem.CIMError as e: + return 0 + except pywbem.CIMError as exc: sys.stderr.write('Failed: %d (%s) - %s\n' % ( - e.args[0], cim_error2text[e.args[0]], - e.args[1].replace('<br>', '\n'))) - sys.exit(1) - except NotFound as e: - sys.stderr.write(str(e) + '\n') - sys.exit(1) + exc.args[0], CIM_ERROR2TEXT[exc.args[0]], + exc.args[1].replace('<br>', '\n'))) + return 1 + except NotFound as exc: + sys.stderr.write(str(exc) + '\n') + return 1 +if __name__ == '__main__': + ARGS = parse_args() + sys.exit(main(ARGS)) |