From a86d5c0a2815ce2bc288a76ff6edc103ff8eb3a5 Mon Sep 17 00:00:00 2001 From: Bill Peck Date: Tue, 5 May 2015 14:12:12 -0400 Subject: initial commit --- git_taskrepo/command.py | 394 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 394 insertions(+) create mode 100644 git_taskrepo/command.py (limited to 'git_taskrepo/command.py') diff --git a/git_taskrepo/command.py b/git_taskrepo/command.py new file mode 100644 index 0000000..9ab2af9 --- /dev/null +++ b/git_taskrepo/command.py @@ -0,0 +1,394 @@ + +# -*- coding: utf-8 -*- + +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. + +# This is a copy and paste of various parts of kobo 0.4.2-1 needed to support the +# creation of kobo commands + +import sys +import os +import optparse +from optparse import Option +import sqlite3 as lite +from git import Repo + +def username_prompt(prompt=None, default_value=None): + """Ask for a username.""" + if default_value is not None: + return default_value + + prompt = prompt or "Enter your username: " + print >>sys.stderr, prompt, + return sys.stdin.readline() + + +def password_prompt(prompt=None, default_value=None): + """Ask for a password.""" + import getpass + + if default_value is not None: + return default_value + + prompt = prompt or "Enter your password: " + try: + # try to use stderr stream + result = getpass.getpass(prompt, stream=sys.stderr) + except TypeError: + # fall back to stdout + result = getpass.getpass(prompt) + return result + + +def yes_no_prompt(prompt, default_value=None): + """Give a yes/no (y/n) question.""" + if default_value is not None: + if default_value not in ("Y", "N"): + raise ValueError("Invalid default value: %s" % default_value) + default_value = default_value.upper() + + prompt = "%s [%s/%s]: " % (prompt, ("y", "Y")[default_value == "Y"], ("n", "N")[default_value == "N"]) + print >>sys.stderr, prompt, + + while True: + user_input = sys.stdin.readline().strip().upper() + if user_input == "" and default_value is not None: + user_input = default_value + + if user_input == "Y": + return True + if user_input == "N": + return False + +def are_you_sure_prompt(prompt=None): + """Give a yes/no (y/n) question.""" + prompt = prompt or "Are you sure? Enter 'YES' to continue: " + print >>sys.stderr, prompt, + user_input = sys.stdin.readline().strip() + + if user_input == "YES": + return True + + return False + + +class Plugin(object): + """A plugin base class.""" + + author = None + version = None + enabled = False + + def __getattr__(self, name): + """ + Get missing attribute from a container. + This is quite hackish but it allows to define settings and methods per container. + """ + return getattr(self.container, name) + + +class Command(Plugin): + """An abstract class representing a command for CommandOptionParser.""" + + enabled = False + admin = False + + username_prompt = staticmethod(username_prompt) + password_prompt = staticmethod(password_prompt) + yes_no_prompt = staticmethod(yes_no_prompt) + are_you_sure_prompt = staticmethod(are_you_sure_prompt) + + def __init__(self, parser): + Plugin.__init__(self) + self.parser = parser + + def options(self): + """Add options to self.parser.""" + pass + + def run(self, *args, **kwargs): + """Run a command. Arguments contain parsed options.""" + raise NotImplementedError() + + def set_repo(self, **kwargs): + self.container.set_repo(**kwargs) + + def set_taskrepo(self, **kwargs): + self.container.set_taskrepo(**kwargs) + + +class PluginContainer(object): + """A plugin container. + + Usage: Inherit PluginContainer and register plugins to the new class. + """ + + def __getitem__(self, name): + return self._get_plugin(name) + + def __iter__(self): + return self.plugins.iterkeys() + + @classmethod + def normalize_name(cls, name): + return name + + @classmethod + def _get_plugins(cls): + """Return dictionary of registered plugins.""" + + result = {} + parent_plugins = cls._get_parent_plugins(cls.normalize_name).items() + class_plugins = getattr(cls, "_class_plugins", {}).items() + for name, plugin_class in parent_plugins + class_plugins: + result[name] = type(plugin_class.__name__, (plugin_class, ), {"__doc__": plugin_class.__doc__}) + return result + + @classmethod + def _get_parent_plugins(cls, normalize_function): + result = {} + for parent in cls.__bases__: + if parent is PluginContainer: + # don't use PluginContainer itself - plugins have to be registered to subclasses + continue + + if not issubclass(parent, PluginContainer): + # skip parents which are not PluginContainer subclasses + continue + + # read inherited plugins first (conflicts are resolved recursively) + plugins = parent._get_parent_plugins(normalize_function) + + # read class plugins, override inherited on name conflicts + if hasattr(parent, "_class_plugins"): + for plugin_class in parent._class_plugins.values(): + normalized_name = normalize_function(plugin_class.__name__) + plugins[normalized_name] = plugin_class + + for name, value in plugins.iteritems(): + if result.get(name, value) != value: + raise RuntimeError("Cannot register plugin '%s'. Another plugin with the same normalized name (%s) is already in the container." % (str(value), normalized_name)) + + result.update(plugins) + + return result + + @property + def plugins(self): + if not hasattr(self, "_plugins"): + self._plugins = self.__class__._get_plugins() + return self._plugins + + def _get_plugin(self, name): + """Return a plugin or raise KeyError.""" + normalized_name = self.normalize_name(name) + + if normalized_name not in self.plugins: + raise KeyError("Plugin not found: %s" % normalized_name) + + plugin = self.plugins[normalized_name] + plugin.container = self + plugin.normalized_name = normalized_name + return plugin + + @classmethod + def register_plugin(cls, plugin): + """Register a new plugin. Return normalized plugin name.""" + + if cls is PluginContainer: + raise TypeError("Can't register plugin to the PluginContainer base class.") + + if "_class_plugins" not in cls.__dict__: + cls._class_plugins = {} + + if not getattr(plugin, "enabled", False): + return + + normalized_name = cls.normalize_name(plugin.__name__) + cls._class_plugins[normalized_name] = plugin + return normalized_name + + @classmethod + def register_module(cls, module, prefix=None, skip_broken=False): + """Register all plugins in a module's sub-modules. + + @param module: a python module that contains plugin sub-modules + @type module: module + @param prefix: if specified, only modules with this prefix will be processed + @type prefix: str + @param skip_broken: skip broken sub-modules and print a warning + @type skip_broken: bool + """ + path = os.path.dirname(module.__file__) + module_list = [] + + for fn in os.listdir(path): + if not fn.endswith(".py"): + continue + if fn.startswith("_"): + continue + if prefix and not fn.startswith(prefix): + continue + if not os.path.isfile(os.path.join(path, fn)): + continue + module_list.append(fn[:-3]) + + if skip_broken: + for mod in module_list[:]: + try: + __import__(module.__name__, {}, {}, [mod]) + except: + import sys + print >> sys.stderr, "WARNING: Skipping broken plugin module: %s.%s" % (module.__name__, mod) + module_list.remove(mod) + else: + __import__(module.__name__, {}, {}, module_list) + + for mn in module_list: + mod = getattr(module, mn) + for pn in dir(mod): + plugin = getattr(mod, pn) + if type(plugin) is type and issubclass(plugin, Plugin) and plugin is not Plugin: + cls.register_plugin(plugin) + + +class CommandContainer(PluginContainer): + """Container for Command classes.""" + + @classmethod + def normalize_name(cls, name): + """Replace some characters in command names.""" + return name.lower().replace('_', '-').replace(' ', '-') + + def set_repo(self, **kwargs): + self.repo = Repo(os.getcwd(), search_parent_directories=True) + + def set_taskrepo(self, init=False, **kwargs): + # do some work to find the default location for the db + + db_location = os.path.join(self.repo.working_tree_dir, "taskrepo.db") + if not init and not os.path.exists(db_location): + raise ValueError(u'Not a valid taskrepo, please init first! (git taskrepo init)') + conn = lite.connect(db_location) + self.taskrepo = conn + +class CommandOptionParser(optparse.OptionParser): + """Enhanced OptionParser with plugin support.""" + def __init__(self, + usage=None, + option_list=None, + option_class=Option, + version=None, + conflict_handler="error", + description=None, + formatter=None, + add_help_option=True, + prog=None, + command_container=None, + default_command="help", + add_username_password_options=False): + + usage = usage or "%prog [args] [--help]" + self.container = command_container + self.default_command = default_command + self.command = None + formatter = formatter or optparse.IndentedHelpFormatter(max_help_position=33) + + optparse.OptionParser.__init__(self, usage, option_list, option_class, version, conflict_handler, description, formatter, add_help_option, prog) + + if add_username_password_options: + option_list = [ + optparse.Option("--username", help="specify user"), + optparse.Option("--password", help="specify password"), + ] + self._populate_option_list(option_list, add_help=False) + + def print_help(self, file=None, admin=False): + if file is None: + file = sys.stdout + file.write(self.format_help()) + if self.command in (None, "help", "help-admin"): + file.write("\n") + file.write(self.format_help_commands(admin=admin)) + + def format_help_commands(self, admin=False): + commands = [] + admin_commands = [] + + for name, plugin in sorted(self.container.plugins.iteritems()): + is_admin = getattr(plugin, "admin", False) + text = " %-30s %s" % (name, plugin.__doc__ or "") + if is_admin: + if admin: + admin_commands.append(text) + else: + commands.append(text) + + if commands: + commands.insert(0, "commands:") + commands.append("") + + if admin_commands: + admin_commands.insert(0, "admin commands:") + admin_commands.append("") + + return "\n".join(commands + admin_commands) + + def parse_args(self, args=None, values=None): + """return (command_instance, opts, args)""" + args = self._get_args(args) + command = None + + if len(args) > 0 and not args[0].startswith("-"): + command = args[0] + args = args[1:] + else: + command = self.default_command + # keep args as is + + if not command in self.container.plugins: + self.error("unknown command: %s" % command) + + CommandClass = self.container[command] + cmd = CommandClass(self) + if self.command != cmd.normalized_name: + self.command = cmd.normalized_name + cmd.options() + cmd_opts, cmd_args = optparse.OptionParser.parse_args(self, args, values) + return (cmd, cmd_opts, cmd_args) + + def run(self, args=None, values=None): + """parse arguments and run a command""" + cmd, cmd_opts, cmd_args = self.parse_args(args, values) + cmd_kwargs = cmd_opts.__dict__ + cmd.run(*cmd_args, **cmd_kwargs) + +class Help(Command): + """show this help message and exit""" + enabled = True + + def options(self): + pass + + def run(self, *args, **kwargs): + self.parser.print_help(admin=False) + + +class Help_Admin(Command): + """show help message about administrative commands and exit""" + enabled = True + + def options(self): + # override default --help option + opt = self.parser.get_option("--help") + opt.action = "store_true" + opt.dest = "help" + + def run(self, *args, **kwargs): + self.parser.print_help(admin=True) + +CommandContainer.register_plugin(Help) +CommandContainer.register_plugin(Help_Admin) -- cgit