summaryrefslogtreecommitdiffstats
path: root/git_taskrepo/command.py
diff options
context:
space:
mode:
authorBill Peck <bpeck@redhat.com>2015-05-05 14:12:12 -0400
committerBill Peck <bpeck@redhat.com>2015-05-05 14:12:12 -0400
commita86d5c0a2815ce2bc288a76ff6edc103ff8eb3a5 (patch)
treec49e3624b55ee96db6a79fb8fb6edf79637eed43 /git_taskrepo/command.py
downloadtaskrepo-a86d5c0a2815ce2bc288a76ff6edc103ff8eb3a5.tar.gz
taskrepo-a86d5c0a2815ce2bc288a76ff6edc103ff8eb3a5.tar.xz
taskrepo-a86d5c0a2815ce2bc288a76ff6edc103ff8eb3a5.zip
initial commit
Diffstat (limited to 'git_taskrepo/command.py')
-rw-r--r--git_taskrepo/command.py394
1 files changed, 394 insertions, 0 deletions
diff --git a/git_taskrepo/command.py b/git_taskrepo/command.py
new file mode 100644
index 0000000..9ab2af9
--- /dev/null
+++ b/git_taskrepo/command.py
@@ -0,0 +1,394 @@
+
+# -*- coding: utf-8 -*-
+
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+
+# This is a copy and paste of various parts of kobo 0.4.2-1 needed to support the
+# creation of kobo commands
+
+import sys
+import os
+import optparse
+from optparse import Option
+import sqlite3 as lite
+from git import Repo
+
+def username_prompt(prompt=None, default_value=None):
+ """Ask for a username."""
+ if default_value is not None:
+ return default_value
+
+ prompt = prompt or "Enter your username: "
+ print >>sys.stderr, prompt,
+ return sys.stdin.readline()
+
+
+def password_prompt(prompt=None, default_value=None):
+ """Ask for a password."""
+ import getpass
+
+ if default_value is not None:
+ return default_value
+
+ prompt = prompt or "Enter your password: "
+ try:
+ # try to use stderr stream
+ result = getpass.getpass(prompt, stream=sys.stderr)
+ except TypeError:
+ # fall back to stdout
+ result = getpass.getpass(prompt)
+ return result
+
+
+def yes_no_prompt(prompt, default_value=None):
+ """Give a yes/no (y/n) question."""
+ if default_value is not None:
+ if default_value not in ("Y", "N"):
+ raise ValueError("Invalid default value: %s" % default_value)
+ default_value = default_value.upper()
+
+ prompt = "%s [%s/%s]: " % (prompt, ("y", "Y")[default_value == "Y"], ("n", "N")[default_value == "N"])
+ print >>sys.stderr, prompt,
+
+ while True:
+ user_input = sys.stdin.readline().strip().upper()
+ if user_input == "" and default_value is not None:
+ user_input = default_value
+
+ if user_input == "Y":
+ return True
+ if user_input == "N":
+ return False
+
+def are_you_sure_prompt(prompt=None):
+ """Give a yes/no (y/n) question."""
+ prompt = prompt or "Are you sure? Enter 'YES' to continue: "
+ print >>sys.stderr, prompt,
+ user_input = sys.stdin.readline().strip()
+
+ if user_input == "YES":
+ return True
+
+ return False
+
+
+class Plugin(object):
+ """A plugin base class."""
+
+ author = None
+ version = None
+ enabled = False
+
+ def __getattr__(self, name):
+ """
+ Get missing attribute from a container.
+ This is quite hackish but it allows to define settings and methods per container.
+ """
+ return getattr(self.container, name)
+
+
+class Command(Plugin):
+ """An abstract class representing a command for CommandOptionParser."""
+
+ enabled = False
+ admin = False
+
+ username_prompt = staticmethod(username_prompt)
+ password_prompt = staticmethod(password_prompt)
+ yes_no_prompt = staticmethod(yes_no_prompt)
+ are_you_sure_prompt = staticmethod(are_you_sure_prompt)
+
+ def __init__(self, parser):
+ Plugin.__init__(self)
+ self.parser = parser
+
+ def options(self):
+ """Add options to self.parser."""
+ pass
+
+ def run(self, *args, **kwargs):
+ """Run a command. Arguments contain parsed options."""
+ raise NotImplementedError()
+
+ def set_repo(self, **kwargs):
+ self.container.set_repo(**kwargs)
+
+ def set_taskrepo(self, **kwargs):
+ self.container.set_taskrepo(**kwargs)
+
+
+class PluginContainer(object):
+ """A plugin container.
+
+ Usage: Inherit PluginContainer and register plugins to the new class.
+ """
+
+ def __getitem__(self, name):
+ return self._get_plugin(name)
+
+ def __iter__(self):
+ return self.plugins.iterkeys()
+
+ @classmethod
+ def normalize_name(cls, name):
+ return name
+
+ @classmethod
+ def _get_plugins(cls):
+ """Return dictionary of registered plugins."""
+
+ result = {}
+ parent_plugins = cls._get_parent_plugins(cls.normalize_name).items()
+ class_plugins = getattr(cls, "_class_plugins", {}).items()
+ for name, plugin_class in parent_plugins + class_plugins:
+ result[name] = type(plugin_class.__name__, (plugin_class, ), {"__doc__": plugin_class.__doc__})
+ return result
+
+ @classmethod
+ def _get_parent_plugins(cls, normalize_function):
+ result = {}
+ for parent in cls.__bases__:
+ if parent is PluginContainer:
+ # don't use PluginContainer itself - plugins have to be registered to subclasses
+ continue
+
+ if not issubclass(parent, PluginContainer):
+ # skip parents which are not PluginContainer subclasses
+ continue
+
+ # read inherited plugins first (conflicts are resolved recursively)
+ plugins = parent._get_parent_plugins(normalize_function)
+
+ # read class plugins, override inherited on name conflicts
+ if hasattr(parent, "_class_plugins"):
+ for plugin_class in parent._class_plugins.values():
+ normalized_name = normalize_function(plugin_class.__name__)
+ plugins[normalized_name] = plugin_class
+
+ for name, value in plugins.iteritems():
+ if result.get(name, value) != value:
+ raise RuntimeError("Cannot register plugin '%s'. Another plugin with the same normalized name (%s) is already in the container." % (str(value), normalized_name))
+
+ result.update(plugins)
+
+ return result
+
+ @property
+ def plugins(self):
+ if not hasattr(self, "_plugins"):
+ self._plugins = self.__class__._get_plugins()
+ return self._plugins
+
+ def _get_plugin(self, name):
+ """Return a plugin or raise KeyError."""
+ normalized_name = self.normalize_name(name)
+
+ if normalized_name not in self.plugins:
+ raise KeyError("Plugin not found: %s" % normalized_name)
+
+ plugin = self.plugins[normalized_name]
+ plugin.container = self
+ plugin.normalized_name = normalized_name
+ return plugin
+
+ @classmethod
+ def register_plugin(cls, plugin):
+ """Register a new plugin. Return normalized plugin name."""
+
+ if cls is PluginContainer:
+ raise TypeError("Can't register plugin to the PluginContainer base class.")
+
+ if "_class_plugins" not in cls.__dict__:
+ cls._class_plugins = {}
+
+ if not getattr(plugin, "enabled", False):
+ return
+
+ normalized_name = cls.normalize_name(plugin.__name__)
+ cls._class_plugins[normalized_name] = plugin
+ return normalized_name
+
+ @classmethod
+ def register_module(cls, module, prefix=None, skip_broken=False):
+ """Register all plugins in a module's sub-modules.
+
+ @param module: a python module that contains plugin sub-modules
+ @type module: module
+ @param prefix: if specified, only modules with this prefix will be processed
+ @type prefix: str
+ @param skip_broken: skip broken sub-modules and print a warning
+ @type skip_broken: bool
+ """
+ path = os.path.dirname(module.__file__)
+ module_list = []
+
+ for fn in os.listdir(path):
+ if not fn.endswith(".py"):
+ continue
+ if fn.startswith("_"):
+ continue
+ if prefix and not fn.startswith(prefix):
+ continue
+ if not os.path.isfile(os.path.join(path, fn)):
+ continue
+ module_list.append(fn[:-3])
+
+ if skip_broken:
+ for mod in module_list[:]:
+ try:
+ __import__(module.__name__, {}, {}, [mod])
+ except:
+ import sys
+ print >> sys.stderr, "WARNING: Skipping broken plugin module: %s.%s" % (module.__name__, mod)
+ module_list.remove(mod)
+ else:
+ __import__(module.__name__, {}, {}, module_list)
+
+ for mn in module_list:
+ mod = getattr(module, mn)
+ for pn in dir(mod):
+ plugin = getattr(mod, pn)
+ if type(plugin) is type and issubclass(plugin, Plugin) and plugin is not Plugin:
+ cls.register_plugin(plugin)
+
+
+class CommandContainer(PluginContainer):
+ """Container for Command classes."""
+
+ @classmethod
+ def normalize_name(cls, name):
+ """Replace some characters in command names."""
+ return name.lower().replace('_', '-').replace(' ', '-')
+
+ def set_repo(self, **kwargs):
+ self.repo = Repo(os.getcwd(), search_parent_directories=True)
+
+ def set_taskrepo(self, init=False, **kwargs):
+ # do some work to find the default location for the db
+
+ db_location = os.path.join(self.repo.working_tree_dir, "taskrepo.db")
+ if not init and not os.path.exists(db_location):
+ raise ValueError(u'Not a valid taskrepo, please init first! (git taskrepo init)')
+ conn = lite.connect(db_location)
+ self.taskrepo = conn
+
+class CommandOptionParser(optparse.OptionParser):
+ """Enhanced OptionParser with plugin support."""
+ def __init__(self,
+ usage=None,
+ option_list=None,
+ option_class=Option,
+ version=None,
+ conflict_handler="error",
+ description=None,
+ formatter=None,
+ add_help_option=True,
+ prog=None,
+ command_container=None,
+ default_command="help",
+ add_username_password_options=False):
+
+ usage = usage or "%prog <command> [args] [--help]"
+ self.container = command_container
+ self.default_command = default_command
+ self.command = None
+ formatter = formatter or optparse.IndentedHelpFormatter(max_help_position=33)
+
+ optparse.OptionParser.__init__(self, usage, option_list, option_class, version, conflict_handler, description, formatter, add_help_option, prog)
+
+ if add_username_password_options:
+ option_list = [
+ optparse.Option("--username", help="specify user"),
+ optparse.Option("--password", help="specify password"),
+ ]
+ self._populate_option_list(option_list, add_help=False)
+
+ def print_help(self, file=None, admin=False):
+ if file is None:
+ file = sys.stdout
+ file.write(self.format_help())
+ if self.command in (None, "help", "help-admin"):
+ file.write("\n")
+ file.write(self.format_help_commands(admin=admin))
+
+ def format_help_commands(self, admin=False):
+ commands = []
+ admin_commands = []
+
+ for name, plugin in sorted(self.container.plugins.iteritems()):
+ is_admin = getattr(plugin, "admin", False)
+ text = " %-30s %s" % (name, plugin.__doc__ or "")
+ if is_admin:
+ if admin:
+ admin_commands.append(text)
+ else:
+ commands.append(text)
+
+ if commands:
+ commands.insert(0, "commands:")
+ commands.append("")
+
+ if admin_commands:
+ admin_commands.insert(0, "admin commands:")
+ admin_commands.append("")
+
+ return "\n".join(commands + admin_commands)
+
+ def parse_args(self, args=None, values=None):
+ """return (command_instance, opts, args)"""
+ args = self._get_args(args)
+ command = None
+
+ if len(args) > 0 and not args[0].startswith("-"):
+ command = args[0]
+ args = args[1:]
+ else:
+ command = self.default_command
+ # keep args as is
+
+ if not command in self.container.plugins:
+ self.error("unknown command: %s" % command)
+
+ CommandClass = self.container[command]
+ cmd = CommandClass(self)
+ if self.command != cmd.normalized_name:
+ self.command = cmd.normalized_name
+ cmd.options()
+ cmd_opts, cmd_args = optparse.OptionParser.parse_args(self, args, values)
+ return (cmd, cmd_opts, cmd_args)
+
+ def run(self, args=None, values=None):
+ """parse arguments and run a command"""
+ cmd, cmd_opts, cmd_args = self.parse_args(args, values)
+ cmd_kwargs = cmd_opts.__dict__
+ cmd.run(*cmd_args, **cmd_kwargs)
+
+class Help(Command):
+ """show this help message and exit"""
+ enabled = True
+
+ def options(self):
+ pass
+
+ def run(self, *args, **kwargs):
+ self.parser.print_help(admin=False)
+
+
+class Help_Admin(Command):
+ """show help message about administrative commands and exit"""
+ enabled = True
+
+ def options(self):
+ # override default --help option
+ opt = self.parser.get_option("--help")
+ opt.action = "store_true"
+ opt.dest = "help"
+
+ def run(self, *args, **kwargs):
+ self.parser.print_help(admin=True)
+
+CommandContainer.register_plugin(Help)
+CommandContainer.register_plugin(Help_Admin)