From 231f0bd65aec9cc0767bd6a76d5aa5b27dd37168 Mon Sep 17 00:00:00 2001 From: Jason Gerard DeRose Date: Wed, 28 Jan 2009 13:05:26 -0700 Subject: Finished reworked cli.CLI class into cli.cli plugin --- ipalib/cli.py | 404 ++++++++-------------------------------------------------- 1 file changed, 50 insertions(+), 354 deletions(-) (limited to 'ipalib/cli.py') diff --git a/ipalib/cli.py b/ipalib/cli.py index 4377a048..5d5bdc34 100644 --- a/ipalib/cli.py +++ b/ipalib/cli.py @@ -413,9 +413,9 @@ class help(frontend.Command): name = from_cli(key) if name not in self.Command: raise HelpError(topic=key) - cmd = self.application[key] + cmd = self.Command[name] print 'Purpose: %s' % cmd.doc - self.application.build_parser(cmd).print_help() + self.Backend.cli.build_parser(cmd).print_help() def print_commands(self): mcl = self.get_mcl() @@ -503,195 +503,38 @@ cli_application_commands = ( class Collector(object): - def __init__(self, **extra): + def __init__(self): object.__setattr__(self, '_Collector__options', {}) - object.__setattr__(self, '_Collector__extra', frozenset(extra)) - for (key, value) in extra.iteritems(): - object.__setattr__(self, key, value) def __setattr__(self, name, value): - if name not in self.__extra: - if name in self.__options: - v = self.__options[name] - if type(v) is tuple: - value = v + (value,) - else: - value = (v, value) - self.__options[name] = value + if name in self.__options: + v = self.__options[name] + if type(v) is tuple: + value = v + (value,) + else: + value = (v, value) + self.__options[name] = value object.__setattr__(self, name, value) def __todict__(self): return dict(self.__options) -class CLI(object): +class cli(backend.Executioner): """ - All logic for dispatching over command line interface. + Backend plugin for executing from command line interface. """ - __d = None - __mcl = None - - def __init__(self, api, argv): - self.api = api - self.argv = tuple(argv) - self.__done = set() - - def run(self): - """ - Call `CLI.run_real` in a try/except. - """ - self.bootstrap() - try: - self.run_real() - except KeyboardInterrupt: - print '' - self.api.log.info('operation aborted') - sys.exit() - except PublicError, e: - self.api.log.error(e.strerror) - sys.exit(e.errno) - except Exception, e: - self.api.log.exception('%s: %s', e.__class__.__name__, str(e)) - e = InternalError() - self.api.log.error(e.strerror) - sys.exit(e.errno) - - def run_real(self): - """ - Parse ``argv`` and potentially run a command. - - This method requires several initialization steps to be completed - first, all of which all automatically called with a single call to - `CLI.finalize()`. The initialization steps are broken into separate - methods simply to make it easy to write unit tests. - - The initialization involves these steps: - - 1. `CLI.parse_globals` parses the global options, which get stored - in ``CLI.options``, and stores the remaining args in - ``CLI.cmd_argv``. - - 2. `CLI.bootstrap` initializes the environment information in - ``CLI.api.env``. - - 3. `CLI.load_plugins` registers all plugins, including the - CLI-specific plugins. - - 4. `CLI.finalize` instantiates all plugins and performs the - remaining initialization needed to use the `plugable.API` - instance. - """ - self.__doing('run_real') - self.finalize() - if self.api.env.mode == 'unit_test': - return - if len(self.cmd_argv) < 1: - self.api.Command.help() + def run(self, argv): + if len(argv) == 0: + self.Command.help() return - key = self.cmd_argv[0] - if key not in self: - raise CommandError(name=key) - self.run_cmd(self[key]) - - def finalize(self): - """ - Fully initialize ``CLI.api`` `plugable.API` instance. - - This method first calls `CLI.load_plugins` to perform some dependant - initialization steps, after which `plugable.API.finalize` is called. - - Finally, the CLI-specific commands are passed a reference to this - `CLI` instance by calling `frontend.Application.set_application`. - """ - self.__doing('finalize') - self.load_plugins() - self.api.finalize() - for a in self.api.Application(): - a.set_application(self) - assert self.__d is None - self.__d = dict( - (c.name.replace('_', '-'), c) for c in self.api.Command() - ) - self.textui = self.api.Backend.textui - if self.api.env.in_server is False and 'xmlclient' in self.api.Backend: - self.api.Backend.xmlclient.connect() - - def load_plugins(self): - """ - Load all standard plugins plus the CLI-specific plugins. - - This method first calls `CLI.bootstrap` to preform some dependant - initialization steps, after which `plugable.API.load_plugins` is - called. - - Finally, all the CLI-specific plugins are registered. - """ - self.__doing('load_plugins') - if 'bootstrap' not in self.__done: - self.bootstrap() - self.api.load_plugins() - for klass in cli_application_commands: - self.api.register(klass) - self.api.register(textui) - - def bootstrap(self): - """ - Initialize the ``CLI.api.env`` environment variables. - - This method first calls `CLI.parse_globals` to perform some dependant - initialization steps. Then, using environment variables that may have - been passed in the global options, the ``overrides`` are constructed - and `plugable.API.bootstrap` is called. - """ - self.__doing('bootstrap') - self.parse_globals() - self.api.bootstrap_with_global_options(self.options, context='cli') - - def parse_globals(self): - """ - Parse out the global options. - - This method parses the global options out of the ``CLI.argv`` instance - attribute, after which two new instance attributes are available: - - 1. ``CLI.options`` - an ``optparse.Values`` instance containing - the global options. - - 2. ``CLI.cmd_argv`` - a tuple containing the remainder of - ``CLI.argv`` after the global options have been consumed. - - The common global options are added using the - `util.add_global_options` function. - """ - self.__doing('parse_globals') - parser = optparse.OptionParser() - parser.disable_interspersed_args() - parser.add_option('-a', dest='prompt_all', action='store_true', - help='Prompt for all missing options interactively') - parser.add_option('-n', dest='interactive', action='store_false', - help='Don\'t prompt for any options interactively') - parser.set_defaults( - prompt_all=False, - interactive=True, - ) - util.add_global_options(parser) - (options, args) = parser.parse_args(list(self.argv)) - self.options = options - self.cmd_argv = tuple(args) - - def __doing(self, name): - if name in self.__done: - raise StandardError( - '%s.%s() already called' % (self.__class__.__name__, name) - ) - self.__done.add(name) - - def run_cmd(self, cmd): - kw = self.parse(cmd) - if self.options.interactive: + (key, argv) = (argv[0], argv[1:]) + cmd = self.get_command(key) + kw = self.parse(cmd, argv) + if self.env.interactive: self.prompt_interactively(cmd, kw) - self.prompt_for_passwords(cmd, kw) + self.create_context() result = cmd(**kw) if callable(cmd.output_for_cli): for param in cmd.params(): @@ -700,58 +543,21 @@ class CLI(object): (args, options) = cmd.params_2_args_options(**kw) cmd.output_for_cli(self.api.Backend.textui, result, *args, **options) - def prompt_for_passwords(self, cmd, kw): - for param in cmd.params(): - if not param.password: - continue - if kw.get(param.name, False) is True or param.name in cmd.args: - kw[param.name] = self.textui.prompt_password( - param.cli_name - ) - else: - kw.pop(param.name, None) - return kw - - def prompt_interactively(self, cmd, kw): - """ - Interactively prompt for missing or invalid values. - - By default this method will only prompt for *required* Param that - have a missing or invalid value. However, if - ``CLI.options.prompt_all`` is True, this method will prompt for any - params that have a missing or required values, even if the param is - optional. - """ - for param in cmd.params(): - if param.password or param.autofill: - continue - elif param.name not in kw: - if not param.required and not self.options.prompt_all: - continue - default = param.get_default(**kw) - error = None - while True: - if error is not None: - print '>>> %s: %s' % (param.cli_name, error) - raw = self.textui.prompt(param.cli_name, default) - try: - value = param(raw, **kw) - if value is not None: - kw[param.name] = value - break - except errors.ValidationError, e: - error = e.error - return kw + def get_command(self, key): + name = from_cli(key) + if name not in self.Command: + raise CommandError(name=key) + return self.Command[name] - def parse(self, cmd): + def parse(self, cmd, argv): parser = self.build_parser(cmd) - (kwc, args) = parser.parse_args( - list(self.cmd_argv[1:]), KWCollector() - ) - options = kwc.__todict__() + (collector, args) = parser.parse_args(argv, Collector()) + options = collector.__todict__() kw = cmd.args_options_2_params(*args, **options) return dict(self.parse_iter(cmd, kw)) + # FIXME: Probably move decoding to Command, use same method regardless of + # request source: def parse_iter(self, cmd, kw): """ Decode param values if appropriate. @@ -761,19 +567,18 @@ class CLI(object): if isinstance(param, Bytes): yield (key, value) else: - yield (key, self.textui.decode(value)) - + yield (key, self.Backend.textui.decode(value)) def build_parser(self, cmd): parser = optparse.OptionParser( - usage=self.get_usage(cmd), + usage=' '.join(self.usage_iter(cmd)) ) for option in cmd.options(): kw = dict( dest=option.name, help=option.doc, ) - if option.password: + if option.password and self.env.interactive: kw['action'] = 'store_true' elif option.type is bool: if option.default is True: @@ -786,10 +591,7 @@ class CLI(object): parser.add_option(o) return parser - def get_usage(self, cmd): - return ' '.join(self.get_usage_iter(cmd)) - - def get_usage_iter(self, cmd): + def usage_iter(self, cmd): yield 'Usage: %%prog [global-options] %s' % to_cli(cmd.name) for arg in cmd.args(): if arg.password: @@ -802,64 +604,23 @@ class CLI(object): else: yield '[%s]' % name - def __get_mcl(self): - """ - Returns the Max Command Length. - """ - if self.__mcl is None: - if self.__d is None: - return None - self.__mcl = max(len(k) for k in self.__d) - return self.__mcl - mcl = property(__get_mcl) - - def isdone(self, name): - """ - Return True in method named ``name`` has already been called. - """ - return name in self.__done - - def __contains__(self, key): - assert self.__d is not None, 'you must call finalize() first' - return key in self.__d - - def __getitem__(self, key): - assert self.__d is not None, 'you must call finalize() first' - return self.__d[key] - - -class cli(backend.Executioner): - """ - Backend plugin for executing from command line interface. - """ - - def run(self, argv): - if len(argv) == 0: - self.Command.help() - return - (key, argv) = (argv[0], argv[1:]) - cmd = self.get_command(key) - (kw, collector) = self.parse(cmd, argv) - if collector._interactive: - self.prompt_interactively(cmd, kw, collector) - self.create_context() - - def prompt_interactively(self, cmd, kw, collector): + def prompt_interactively(self, cmd, kw): """ Interactively prompt for missing or invalid values. By default this method will only prompt for *required* Param that have a missing or invalid value. However, if - ``CLI.options.prompt_all`` is True, this method will prompt for any - params that have a missing or required values, even if the param is - optional. + ``self.env.prompt_all`` is ``True``, this method will prompt for any + params that have a missing values, even if the param is optional. """ for param in cmd.params(): - if param.password or param.autofill: + if param.password: + if kw.get(param.name, False) is True or param.name in cmd.args: + kw[param.name] = \ + self.Backend.textui.prompt_password(param.cli_name) + elif param.autofill or param.name in kw: continue - elif param.name not in kw: - if not param.required and not collector._prompt_all: - continue + elif param.required or self.env.prompt_all: default = param.get_default(**kw) error = None while True: @@ -874,98 +635,33 @@ class cli(backend.Executioner): except errors.ValidationError, e: error = e.error - def get_command(self, key): - name = from_cli(key) - if name not in self.Command: - raise CommandError(name=key) - return self.Command[name] - - def parse(self, cmd, argv): - parser = self.build_parser(cmd) - (collector, args) = parser.parse_args(argv, - Collector(_prompt_all=False, interactive=True) - ) - options = collector.__todict__() - kw = cmd.args_options_2_params(*args, **options) - return (dict(self.parse_iter(cmd, kw)), collector) - - # FIXME: Move decoding to Command, use same regardless of request source - def parse_iter(self, cmd, kw): - """ - Decode param values if appropriate. - """ - for (key, value) in kw.iteritems(): - param = cmd.params[key] - if isinstance(param, Bytes): - yield (key, value) - else: - yield (key, self.Backend.textui.decode(value)) - - def build_parser(self, cmd): - parser = optparse.OptionParser( - usage=' '.join(self.usage_iter(cmd)) - ) - if len(cmd.params) > 0: - parser.add_option('-a', dest='_prompt_all', action='store_true', - help='Prompt for all values interactively') - parser.add_option('-n', dest='_interactive', action='store_false', - help="Don\'t prompt for any values interactively") - for option in cmd.options(): - kw = dict( - dest=option.name, - help=option.doc, - ) - if option.password: - kw['action'] = 'store_true' - elif option.type is bool: - if option.default is True: - kw['action'] = 'store_false' - else: - kw['action'] = 'store_true' - else: - kw['metavar'] = metavar=option.__class__.__name__.upper() - o = optparse.make_option('--%s' % to_cli(option.cli_name), **kw) - parser.add_option(o) - return parser - - def usage_iter(self, cmd): - yield 'Usage: %%prog [global-options] %s' % to_cli(cmd.name) - for arg in cmd.args(): - if arg.password: - continue - name = to_cli(arg.cli_name).upper() - if arg.multivalue: - name = '%s...' % name - if arg.required: - yield name - else: - yield '[%s]' % name - cli_plugins = ( cli, textui, + console, help, ) def run(api): + error = None try: argv = api.bootstrap_with_global_options(context='cli') for klass in cli_plugins: api.register(klass) api.load_plugins() api.finalize() - api.Backend.cli.run(sys.argv[1:]) - sys.exit() + api.Backend.cli.run(argv) except KeyboardInterrupt: print '' api.log.info('operation aborted') - sys.exit() except PublicError, e: error = e except Exception, e: api.log.exception('%s: %s', e.__class__.__name__, str(e)) error = InternalError() - api.log.error(error.strerror) - sys.exit(error.errno) + if error is not None: + assert isinstance(error, PublicError) + api.log.error(error.strerror) + sys.exit(error.errno) -- cgit