summaryrefslogtreecommitdiffstats
path: root/src/software/cli/software.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/software/cli/software.py')
-rwxr-xr-xsrc/software/cli/software.py282
1 files changed, 116 insertions, 166 deletions
diff --git a/src/software/cli/software.py b/src/software/cli/software.py
index 5368e5f..14c26df 100755
--- a/src/software/cli/software.py
+++ b/src/software/cli/software.py
@@ -19,20 +19,21 @@
# Authors: Michal Minar <miminar@redhat.com>
#
+"""
+Command line tool for simple software management with OpenLMI CIM software
+providers.
+"""
+
import argparse
from collections import defaultdict
-import os
import pywbem
-import re
import socket
-import subprocess
import sys
-import unittest
-re_nevra = re.compile(r'^(?P<name>.+)-(?P<evra>(?P<epoch>\d+):(?P<ver>[^-]+)'
- r'-(?P<rel>.+)\.(?P<arch>[^.]+))$')
+from openlmi.software import util
+from openlmi.software.core import SystemSoftwareCollection
-cim_error2text = defaultdict(lambda: "OTHER_ERROR", {
+CIM_ERROR2TEXT = defaultdict(lambda: "OTHER_ERROR", {
1 : "FAILED",
2 : "ACCESS_DENIED",
3 : "INVALID_NAMESPACE",
@@ -53,146 +54,74 @@ cim_error2text = defaultdict(lambda: "OTHER_ERROR", {
})
class NotFound(Exception):
+ """
+ Exception raised, when desired package could not be found.
+ """
def __init__(self, package, details=None):
- msg = 'Package "%s" not installed!'%package
+ msg = 'Package "%s" not installed!' % package
if details is not None:
msg += ' : '+details
Exception.__init__(self, msg)
-HOSTNAME = None
def get_host_name():
- global HOSTNAME
- if HOSTNAME is None:
- HOSTNAME = socket.gethostname()
- return HOSTNAME
-
-def make_nevra(name, epoch, ver, rel, arch, with_epoch='NOT_ZERO'):
"""
- @param with_epoch may be one of:
- "NOT_ZERO" - include epoch only if it's not zero
- "ALWAYS" - include epoch always
- "NEVER" - do not include epoch at all
+ @return computer host name
"""
- estr = ''
- if with_epoch.lower() == "always":
- estr = epoch
- elif with_epoch.lower() == "not_zero":
- if epoch != "0":
- estr = epoch
- if len(estr): estr += ":"
- return "%s-%s%s-%s.%s" % (name, estr, ver, rel, arch)
+ if not hasattr(get_host_name, '_hostname'):
+ get_host_name._hostname = socket.gethostname() #pylint: disable=W0212
+ return get_host_name._hostname #pylint: disable=W0212
def make_pkg_path(name, epoch, ver, rel, arch):
+ """
+ @return instance name for LMI_SoftwareIdentity
+ """
return pywbem.CIMInstanceName(
- classname="LMI_SoftwarePackage", namespace="root/cimv2",
+ classname="LMI_SoftwareIdentity", namespace="root/cimv2",
keybindings={
- "Name" : name,
- "SoftwareElementID" : make_nevra(
- name, epoch, ver, rel, arch, "ALWAYS"),
- "SoftwareElementState" : pywbem.Uint16(2),
- "TargetOperatingSystem" : pywbem.Uint16(36),
- "Version" : ver })
-
-def make_inst_pkg_path(*args):
- op = pywbem.CIMInstanceName(classname="LMI_SoftwareInstalledPackage",
- namespace='root/cimv2')
- system_op = pywbem.CIMInstanceName(
- classname="CIM_ComputerSystem", namespace="root/cimv2",
- keybindings={
- "CreationClassName" : "CIM_ComputerSystem",
- "Name" : get_host_name() })
- op["Software"] = make_pkg_path(*args)
- op["System"] = system_op
- return op
-
-def get_instance_name(package, is_nevra=False, installed=True):
- m = re_nevra.match(package)
- if not m and is_nevra:
- raise ValueError('Expected a valid nevra!.')
- if not m and not installed:
- raise ValueError('You must supply a valid nevra!')
- if not m: # given only a name of package
- # try to enumerate installed packages and find a correct one
- inames = conn.EnumerateInstanceNames(
- ClassName='LMI_SoftwareInstalledPackage',
- namespace='root/cimv2')
- for iname in inames:
- if iname['Software']['Name'] == package:
- break
- else:
- raise NotFound(package)
- else: # nevra given
- try:
- args = [m.group(k) for k in 'name', 'epoch', 'ver', 'rel', 'arch']
- if not args[1]: # epoch not given
- args[1] = '0'
- func = (make_pkg_path, make_inst_pkg_path)[1 if installed else 0]
- inst = conn.GetInstance(InstanceName=func(*args), LocalOnly=False)
- iname = inst.path
- except pywbem.CIMError as e:
- if pywbem.CIM_ERR_NOT_FOUND:
- raise NotFound(package, e.args[1])
- raise
- return iname
-
-def install(conn, nevra):
- iname = get_instance_name(nevra, True, False)
- (rval, oparms) = conn.InvokeMethod(MethodName='Install', ObjectName=iname)
- print (rval, oparms)
-
-def update(conn, package, epoch=None, version=None, release=None):
- iname = get_instance_name(package)
- kwargs = dict(MethodName='Update', ObjectName=iname)
- for name, val in ( ('Epoch', epoch), ('Version', version)
- , ('Release', release)):
- if val is None: continue
- kwargs[name] = str(val)
- (rval, oparms) = conn.InvokeMethod(**kwargs)
- print (rval, oparms)
-
-def remove(conn, package):
- iname = get_instance_name(package)
- conn.DeleteInstance(iname)
-
-def verify(conn, package):
- iname = get_instance_name(package)
- (rval, oparms) = conn.InvokeMethod(MethodName='CheckIntegrity',
- ObjectName=iname)
- if rval == 0:
- print "Passed"
- elif rval == 2:
- print "Error"
- else: # Not Passed
- print "Not Passed:"
- for f in oparms["Failed"]:
- inst = conn.GetInstance(InstanceName=f, LocalOnly=False)
- print " %s"%inst["Name"]
- if not inst['FileExists']:
- print " - does not exist"
- else:
- for arg in ( 'Checksum', 'LinkTarget', 'FileGroupID'
- , 'FileUserID', 'FileMode', 'FileSize'
- , 'FileType', 'LastModificationTime'):
- if inst['Expected'+arg] != inst[arg]:
- print " - %s\t: %s != %s"%(arg,
- inst['Expected'+arg], inst[arg])
-
-def list_installed(conn):
- inames = conn.EnumerateInstanceNames(
- ClassName='LMI_SoftwareInstalledPackage',
- namespace='root/cimv2')
- for nevra in sorted((i['Software']['SoftwareElementID'] for i in inames)):
- print nevra
-
-def list_files(conn, package):
- iname = get_instance_name(package)
- # TODO: find out, why passing role='Check' failes
- inames = conn.ReferenceNames(ObjectName=iname['Software'],
- ResultClass='LMI_SoftwarePackageChecks')
- for i in inames:
- print i['Check']['Name']
-
-if __name__ == '__main__':
+ "InstanceID" : util.make_nevra(
+ name, epoch, ver, rel, arch, "ALWAYS")})
+
+def install(_conn, _nevra):
+ """Install package by nevra."""
+ raise NotImplementedError("Installation is not yet supported!")
+
+def update(_conn, _package, _epoch=None, _version=None, _release=None):
+ """Update to particular evr of package."""
+ raise NotImplementedError("Update of package is not yet supported!")
+
+def remove(_conn, _package):
+ """Remove installed package by its name."""
+ raise NotImplementedError("Removal is not yet supported!")
+
+def verify(_conn, _package):
+ """Verity installed package by its name."""
+ raise NotImplementedError("Verification is not yet supported!")
+
+def list_available(conn):
+ """List nevra strings of available packages."""
+ inames = conn.AssociatorNames(
+ ObjectName=SystemSoftwareCollection.get_path(),
+ ResultClass='LMI_SoftwareIdentity',
+ AssocClass="LMI_MemberOfSoftwareCollection",
+ Role="Collection",
+ ResultRole="Member")
+ for nevra in (i['InstanceID'] for i in inames):
+ print nevra[len("LMI:PKG:"):]
+
+def list_installed(_conn):
+ """List nevra strings of installed packages."""
+ raise NotImplementedError(
+ "Listing of installed packages is not yet supported!")
+
+def list_files(_conn, _package):
+ """List files of installed package."""
+ raise NotImplementedError("Listing of package files is not yet supported!")
+
+def parse_args():
+ """
+ Parse command line arguments and handle related errors.
+ @return Namespace object
+ """
parser = argparse.ArgumentParser(prog='software',
description=("CLI tool for testing OpenLMI software providers."
" With this tool you are able to install, update,"
@@ -201,12 +130,12 @@ if __name__ == '__main__':
parser.add_argument('--url',
help="Specify schema, hostname and port of broker in one argument."
" For user and password, please use provided options.")
- parser.add_argument('-p', '--port', type=int, default=5988,
- help="Port of cimom broker.")
+ parser.add_argument('-p', '--port', type=int, default=5989,
+ help="Port of cimom broker. Default is %(default)d.")
parser.add_argument('-h', '--hostname',
default='localhost', help="Hostname of cimom broker.")
parser.add_argument('-s', '--schema', choices=('http', 'https'),
- default='http')
+ default='https', help="Schema part of url (default is %(default)s)")
parser.add_argument('-u', '--user', default='',
help="Under which user to authenticate.")
parser.add_argument('--password', default='',
@@ -247,44 +176,65 @@ if __name__ == '__main__':
parse_verify.set_defaults(command='verify')
parse_list = subpars.add_parser('list',
- help="List installed packages.")
+ help="List various information depending on next argument."
+ " See \"%(prog)s list --help\".")
parse_list.set_defaults(command='list')
- parse_files = subpars.add_parser('list-files',
- help="List files of installed package.")
- parse_files.add_argument('package',
+ list_subpars = parse_list.add_subparsers(help="What should be listed.")
+ parse_list_available = list_subpars.add_parser("available",
+ help="List available packages.")
+ parse_list_available.set_defaults(list_kind='available')
+ parse_list_installed = list_subpars.add_parser("installed",
+ help="List installed packages.")
+ parse_list_installed.set_defaults(list_kind='installed')
+ parse_list_files = list_subpars.add_parser("files",
+ help="List files of installed package.")
+ parse_list_files.set_defaults(list_kind='files')
+ parse_list_files.add_argument('package',
help="Name or nevra of installed package.")
- parse_files.set_defaults(command='list-files')
args = parser.parse_args()
- if args.url is not None:
- url = args.url
- else:
- url = '%s://%s:%d' % (args.schema, args.hostname, args.port)
- HOSTNAME = re.match('^https?://([^:]+)', url).group(1)
+ if args.url is None:
+ args.url = '%s://%s:%d' % (args.schema, args.hostname, args.port)
+
+ return args
+
+def main(args):
+ """
+ Main functionality.
+ @return return code of script
+ """
auth = (args.user, args.password)
if args.debug:
- sys.stderr.write('url:\t%s\n'%url)
- conn = pywbem.WBEMConnection(url, auth)
+ sys.stderr.write('url:\t%s\n'%args.url)
+ conn = pywbem.WBEMConnection(args.url, auth)
- func, attrs = \
- { 'install' : (install, ('nevra',))
- , 'update' : (update, ('package', 'epoch', 'version', 'release'))
- , 'remove' : (remove, ('package',))
- , 'verify' : (verify, ('package',))
- , 'list' : (list_installed, tuple())
- , 'list-files' : (list_files, ('package',))
- }[args.command]
+ if args.command == 'list':
+ func, attrs = \
+ { 'available' : (list_available, tuple())
+ , 'installed' : (list_installed, tuple())
+ , 'files' : (list_files, ('package'))
+ }[args.list_kind]
+ else:
+ func, attrs = \
+ { 'install' : (install, ('nevra',))
+ , 'update' : (update, ('package', 'epoch', 'version', 'release'))
+ , 'remove' : (remove, ('package',))
+ , 'verify' : (verify, ('package',))
+ }[args.command]
try:
func(conn, *(getattr(args, a) for a in attrs))
- sys.exit(0)
- except pywbem.CIMError as e:
+ return 0
+ except pywbem.CIMError as exc:
sys.stderr.write('Failed: %d (%s) - %s\n' % (
- e.args[0], cim_error2text[e.args[0]],
- e.args[1].replace('<br>', '\n')))
- sys.exit(1)
- except NotFound as e:
- sys.stderr.write(str(e) + '\n')
- sys.exit(1)
+ exc.args[0], CIM_ERROR2TEXT[exc.args[0]],
+ exc.args[1].replace('<br>', '\n')))
+ return 1
+ except NotFound as exc:
+ sys.stderr.write(str(exc) + '\n')
+ return 1
+if __name__ == '__main__':
+ ARGS = parse_args()
+ sys.exit(main(ARGS))