diff options
Diffstat (limited to 'lmi/scripts/_metacommand/interactive.py')
-rw-r--r-- | lmi/scripts/_metacommand/interactive.py | 475 |
1 files changed, 0 insertions, 475 deletions
diff --git a/lmi/scripts/_metacommand/interactive.py b/lmi/scripts/_metacommand/interactive.py deleted file mode 100644 index 349d622..0000000 --- a/lmi/scripts/_metacommand/interactive.py +++ /dev/null @@ -1,475 +0,0 @@ -# Copyright (C) 2013-2014 Red Hat, Inc. All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# 1. Redistributions of source code must retain the above copyright notice, -# this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -# POSSIBILITY OF SUCH DAMAGE. -# -# The views and conclusions contained in the software and documentation are -# those of the authors and should not be interpreted as representing official -# policies, either expressed or implied, of the FreeBSD Project. -# -# Authors: Michal Minar <miminar@redhat.com> -# -""" -Module with interactive application. -""" - -import cmd -import docopt -import itertools -import os -import readline -import shlex - -from lmi.scripts.common import errors -from lmi.scripts.common import get_logger -from lmi.scripts.common.command import LmiBaseCommand, LmiCommandMultiplexer -from lmi.scripts._metacommand import cmdutil -from lmi.scripts._metacommand import exit - -LOG = get_logger(__name__) - -BUILT_INS_USAGE = """ -Built-ins. - -Usage: - : cd [<path>] - : .. - : pwd - : (help | -h | --help) - -Description: - cd Nest to a subcommand. Accepts a sequence of subcommands separated - with "/". If "/" is given, top-level command becomes the active one. - Other supported special symbols are ".." and ".". - .. Make parent command the active one. Same as issuing ":cd ..". - pwd Print current command path. - help Show this help. -""" - -BUILT_INS = ('..', 'cd', 'pwd', 'help') - -class LmiBuiltInError(errors.LmiError): - """ General exception concerning the use of built-in commands. """ - pass - -class LmiCanNotNest(LmiBuiltInError): - """ Raised upon invalid use of *cd* built-in command. """ - pass - -class Interactive(cmd.Cmd): - """ - Launched by the main application. It enters *shell* mode. Allows to launch - set of commands interactively on all given hosts. Session object stays the - same for the whole life of interactive mode. - - :param top_level_cmd: Top-level command. - :type top_level_cmd: :py:class:`.toplevel.TopLevelCommand` - """ - - def __init__(self, top_level_cmd): - self._top_level_cmd = top_level_cmd - cmd.Cmd.__init__(self, - stdin=top_level_cmd.app.stdin, stdout=top_level_cmd.app.stdout) - self._last_exit_code = exit.EXIT_CODE_SUCCESS - self.doc_header = 'Static commands' - self.app.active_command = top_level_cmd - self.load_history() - - # ************************************************************************* - # Properties - # ************************************************************************* - @property - def app(self): - """ :returns: Application object. """ - return self._top_level_cmd.app - - @property - def command_manager(self): - """ :returns: An instance of :py:class:`~.manager.CommandManager`. """ - return self.app.command_manager - - @property - def on_top_level_node(self): - """ - :returns: Whether the current node is a top-level one. In other words - we're not nested in any subcommand namespace. - :rtype: boolean - """ - return self._top_level_cmd is self.app.active_command - - @property - def prompt(self): - """ - :returns: Dynamically computed shell prompt. - :rtype: string - """ - if self.app.stdin.isatty(): - parents_num = 0 - node = self.app.active_command - while node.parent is not None: - parents_num += 1 - node = node.parent - return '>'*parents_num + self.app.active_command.cmd_name + '> ' - return '' - - # ************************************************************************* - # Private methods - # ************************************************************************* - def _change_to_node(self, path): - """ - Handles any command path. It constructs an object of command - corresponding to given path. Path can be absolute or relative. - - :param str path: Path to command. Looks similar to unix file path. - Command names are separated with ``'/'``. - :returns: Multiplexer command corresponding to path. - :rtype: :py:class:`~lmi.scripts.common.command.multiplexer.LmiCommandMultiplexer` - """ - cur_node = self.app.active_command - if path.startswith('/'): - if path.startswith('/lmi'): - path = path[4:] - else: - path = path[1:] - cur_node = self._top_level_cmd - - cmd_chain = os.path.normpath(path).split('/') - for subcmd in cmd_chain: - if subcmd == '..': - cur_node = ( cur_node.parent - if cur_node.parent is not None else cur_node) - elif subcmd and subcmd != '.': - if not subcmd in cmdutil.get_subcommand_names(cur_node): - raise LmiCanNotNest('No such subcommand "%s".' % - "/".join(cmd_chain)) - cmd_cls = cmdutil.get_subcommand_factory(cur_node, subcmd) - if not issubclass(cmd_cls, LmiCommandMultiplexer): - raise LmiCanNotNest('Can not nest to subcommand "%s" which' - " is not a multiplexer." % "/".join(cmd_chain)) - if not cmd_cls.has_own_usage(): - raise LmiCanNotNest('Can not nest to subcommand "%s" which' - ' lacks any help message.' % "/".join(cmd_chain)) - cur_node = cmd_cls(self.app, subcmd, cur_node) - self.app.active_command = cur_node - return exit.EXIT_CODE_SUCCESS - - def _do_built_in_cmd(self, args): - """ - Execute built-in command. - - :param list args: Command arguments including command name. - """ - options = docopt.docopt(BUILT_INS_USAGE, args, help=False) - if options['cd']: - path = '.' if not options.get('<path>', None) else options['<path>'] - return self._change_to_node(path) - elif options['..']: - return self._change_to_node('..') - elif options['pwd']: - self.app.stdout.write("/" - + "/".join(self.app.active_command.get_cmd_name_parts( - all_parts=True)) + "\n") - elif options['help'] or options['-h'] or options['--help']: - self.app.stdout.write(BUILT_INS_USAGE[1:]) - return exit.EXIT_CODE_SUCCESS - - def _execute_line_parts(self, line_parts): - """ - Try to execute given line. This method can throw various exceptions - that needs to be handled by a caller, otherwise interactive mode will - be terminated. - - :param list line_parts: Parsed command line arguments. - :returns: Command's exit code. - """ - if line_parts[0][0] == ':': - if len(line_parts[0]) > 1: - line_parts[0] = line_parts[0][1:] - else: - line_parts = line_parts[1:] - return self._do_built_in_cmd(line_parts) - - else: - # let's try to run registered subcommand - retval = self.run_subcommand(line_parts) - if isinstance(retval, bool) or not isinstance(retval, (int, long)): - retval = ( exit.EXIT_CODE_SUCCESS - if bool(retval) or retval is None - else exit.EXIT_CODE_FAILURE) - return retval - - # ************************************************************************* - # Public methods - # ************************************************************************* - def complete(self, text, state): - """ - Overrides parent's method so that registered commands can be also - completed. - """ - if state == 0: - import readline - origline = readline.get_line_buffer() - line = origline.lstrip() - stripped = len(origline) - len(line) - begidx = readline.get_begidx() - stripped - endidx = readline.get_endidx() - stripped - command, _, _ = self.parseline(line) - compfunc = self.completedefault - if command and hasattr(self, 'complete_' + command): - compfunc = getattr(self, 'complete_' + command) - self.completion_matches = compfunc(text, line, begidx, endidx) - try: - return self.completion_matches[state] - except IndexError: - return None - - def completedefault(self, text, line, *_args, **_kwargs): - """ - Tab-completion for commands known to the command manager and subcommands - in current command namespace. Does not handle command options. - """ - if line.startswith(':'): - commands = BUILT_INS - else: - commands = set(self.completenames(text)) - commands.update(set(cmdutil.get_subcommand_names(self.app.active_command))) - completions = sorted(n for n in commands - if not text or n.startswith(text)) - return completions - - def clear_history(self): - """ Clear readline history. """ - readline.clear_history() - - def default(self, line): - """ - This is run, when line contains unknown command to ``cmd.Cmd``. It - expects us to handle it if we know it, or print an error. - - :param str line: Line given to our shell. - """ - try: - line_parts = shlex.split(line) - except ValueError as err: - LOG().error(str(err)) - return exit.EXIT_CODE_INVALID_SYNTAX - - try: - self._execute_line_parts(line_parts) - - except docopt.DocoptExit as err: - # command found, but options given to it do not comply with its - # usage string - if '--help' in line_parts: - return self.do_help(" ".join( - line_parts[:line_parts.index('--help')])) - LOG().warn("Wrong options given: %s", line.strip()) - self.stdout.write(str(err)) - if ( line_parts[0] in cmdutil.get_subcommand_names( - self.app.active_command) - and cmdutil.get_subcommand_factory(self.app.active_command, - line_parts[0]).is_end_point()): - self.stdout.write("\n\nTo see a full usage string, type:\n" - " help %s\n" % line_parts[0]) - else: - self.stdout.write("\n\nTo see a list of available commands," - " type:\n help\n") - self._last_exit_code = exit.EXIT_CODE_FAILURE - - except errors.LmiCommandNotFound as err: - LOG().error(str(err)) - self._last_exit_code = exit.EXIT_CODE_COMMAND_NOT_FOUND - - except errors.LmiUnsatisfiedDependencies as err: - LOG().error(str(err)) - self._last_exit_code = exit.EXIT_CODE_UNSATISFIED_DEPENDENCIES - - except errors.LmiError as err: - LOG().error(str(err)) - self._last_exit_code = exit.EXIT_CODE_FAILURE - - except KeyboardInterrupt as err: - LOG().debug('%s: %s', err.__class__.__name__, str(err)) - self._last_exit_code = exit.EXIT_CODE_KEYBOARD_INTERRUPT - - return self._last_exit_code - - def do_EOF(self, _arg): #pylint: disable=C0103,R0201 - """ - Exit on End-Of-File if we are on top-level command. Otherwise change - to parent command. - """ - if self.app.stdin.isatty(): - self.app.stdout.write('\n') - if self.app.stdin.isatty() and not self.on_top_level_node: - self._change_to_node('..') - else: - raise errors.LmiTerminate(self._last_exit_code) - - def do_exit(self, arg): - """ - This makes the exit command work in both (non-)interactive modes. - """ - command = ["exit"] - if arg: - command.append(arg) - return self.run_subcommand(command) - - def do_help(self, arg): - """ Handle help subcommand. """ - if arg: - try: - arg_parts = shlex.split(arg) - except ValueError as err: - LOG().error(str(err)) - return exit.EXIT_CODE_INVALID_SYNTAX - - method_name = '_'.join( - itertools.chain(['do'], - itertools.takewhile(lambda x: not x.startswith('-'), - arg_parts))) - if hasattr(self, method_name): - return cmd.Cmd.do_help(self, arg) - else: - arg_parts = [] - - if ( self.on_top_level_node - and ( not arg - or ( len(arg_parts) == 1 - and arg not in self.app.command_manager.command_names))): - if not self.completenames(arg): - LOG().error(str(errors.LmiCommandNotFound(arg))) - cmd.Cmd.do_help(self, '') - else: - cmd.Cmd.do_help(self, arg) - cmd_names = set(self.command_manager) - cmd_names.difference_update(set(self.completenames(''))) - self.print_topics( - "Application commands (type help <topic>):", - sorted(cmd_names), 15, 80) - self.print_topics( - "Built-in commands (type :help):", - [':'+bi for bi in BUILT_INS], 15, 80) - return exit.EXIT_CODE_SUCCESS - - return self.run_subcommand(['help'] + arg_parts) - - def emptyline(self): #pylint: disable=R0201 - """ Do nothing for empty line. """ - pass - - def help_exit(self): - """ Provide help for exit command. """ - cur_node = self.app.active_command - # temporarily change to top level command where help cmd is registered - self.app.active_command = self._top_level_cmd - try: - return self.run_subcommand(["help", "exit"]) - finally: - self.app.active_command = cur_node - - def help_help(self): - """ - Use the command manager to get instructions for "help". - """ - cur_node = self.app.active_command - # temporarily change to top level command where help cmd is registered - self.app.active_command = self._top_level_cmd - try: - return self.run_subcommand(["help", "help"]) - finally: - self.app.active_command = cur_node - - def load_history(self): - """ - Load a readline history file. - """ - if ( self.app.config.history_max_length == 0 - or not os.path.exists(self.app.config.history_file)): - return - LOG().debug('Reading history file "%s"', self.app.config.history_file) - try: - readline.read_history_file(self.app.config.history_file) - if ( self.app.config.history_max_length > 0 - and readline.get_current_history_length() - > self.app.config.history_max_length): - readline.set_history_length(self.app.config.history_max_length) - readline.write_history_file(self.app.config.history_file) - readline.read_history_file(self.app.config.history_file) - except (IOError, OSError) as err: - LOG().warn('Failed to read history file "%s".', - self.app.config.history_file, exc_info=err) - - def postcmd(self, stop, _line): - """ - This is called after the ``do_*`` command to postprocess its result and - decide whether to stop the shell. We want to stop only when - :py:class:`lmi.scripts.common.errors.LmiError` is raised. This - exception is catched upwards in call chain. - - :returns: Whether to stop the shell. - :rtype: bool - """ - return False - - def run_subcommand(self, args): - """ - Run a subcommand given as a first item of ``args``. It must be one of - commands registered in manager. Returns the return value of invoked - command. - - :param list args: List of commands. - """ - if not isinstance(args, (list, tuple)): - raise TypeError("args must be a list") - if len(args) < 1: - raise ValueError("args must not be empty") - try: - cmd_factory = cmdutil.get_subcommand_factory( - self.app.active_command, args[0]) - parent = self.app.active_command - except errors.LmiCommandNotFound: - # When nested into a subcommand, let it handle the args if known. - # If not known, try one of static commands. - if not self.on_top_level_node and hasattr(self, 'do_' + args[0]): - cmd_factory = self.app.command_manager.find_command(args[0]) - parent = self._top_level_cmd - else: - raise - cmd_inst = cmd_factory(self.app, args[0], parent) - return cmd_inst.run(args[1:]) - - def save_history(self): - """ - Saves current history of commands into the history file. If the length - of history exceeds a maximum history file length, the history will be - truncated. - """ - if self.app.config.history_max_length != 0: - LOG().debug('Writing history file "%s"', - self.app.config.history_file) - if self.app.config.history_max_length > 0: - readline.set_history_length(self.app.config.history_max_length) - try: - readline.write_history_file(self.app.config.history_file) - except (IOError, OSError), err: - LOG().warn('Failed to write history file "%s".', - self.app.config.history_file, exc_info=err) |