diff options
Diffstat (limited to 'devshell.py')
| -rwxr-xr-x | devshell.py | 438 |
1 files changed, 110 insertions, 328 deletions
diff --git a/devshell.py b/devshell.py index 119692b..0518cc7 100755 --- a/devshell.py +++ b/devshell.py @@ -1,5 +1,4 @@ #!/usr/bin/python -tt -# $Id: $ # Fedora Developer Shell # # This program is free software; you can redistribute it and/or modify @@ -51,11 +50,6 @@ __description__ = 'A shell for hacking on the Fedora project' FEDORA_DIR = join(expanduser('~'), 'code', 'fedora') DEVSHELL_DIR = join(expanduser('~'), '.devshell') -repos = { - 'fedora_cvs' : ':ext:cvs.fedora.redhat.com:/cvs/pkgs', - 'bodhi_hg' : 'http://hg.fedoraproject.org/hg/hosted/bodhi', -} - stack = [] prompt = ['\033[34;1mfedora\033[0m'] modules = {} @@ -63,275 +57,23 @@ header = lambda x: "%s %s %s" % ('=' * 2, x, '=' * (76 - len(x))) log = logging.getLogger(__name__) class Module: - """ The parent class of all modules """ - def __init__(self): - # setup stack and prompt - pass - -class Mail(Module): - - url = 'https://www.redhat.com/archives/%s/%s' - - def search(self, mailinglist, text): - """ <list> <text>. Search specific mailing list for given text """ - now = datetime.now() - while True: - fetch = True - filename = now.strftime("%Y-%B") + '.txt.gz' - try: - f = urllib2.urlopen(self.url % (mailinglist, filename)) - except urllib2.HTTPError: - break - local = join(DEVSHELL_DIR, mailinglist, filename) - - # Don't fetch the mbox if we already have a good local copy - if exists(local): - info = os.stat(local) - if info[stat.ST_SIZE] == int(f.headers.get('Content-Length')): - fetch = False - log.debug("Using local mbox: %s" % local) - if fetch: - if not exists(dirname(local)): - os.makedirs(dirname(local)) - mbox = file(local, 'w') - log.debug("Downloading %s" % local) - mbox.write(f.read()) - mbox.close() - f.close() - - self.__search_mbox(local, text) - - # Go back in time - now = now - timedelta(days=31) - - def __search_mbox(self, mboxfile, text): - """ - Search a compressed mbox for any specified keywords - """ - num = 0 - statinfo = os.stat(mboxfile) - gzmbox = gzip.open(mboxfile) - mbox = UnixMailbox(gzmbox, email.message_from_file) - while True: - msg = mbox.next() - if not msg: break - num += 1 - fields = [msg['From'], msg['Subject']] + self.__body(msg) - for field in fields: - if re.search(text, field, re.IGNORECASE): - id = "%s.%s" % (statinfo[stat.ST_INO], num) - print "[%s] %s %s" % (id, msg['From'], msg['Subject']) - break - gzmbox.close() - - def __body(self, msg): - """ - Recursively gather multipart message bodies - """ - body = [] - if msg.is_multipart(): - for payload in msg.get_payload(): - for part in payload.walk(): - body += self.__body(part) - else: - body = [msg.get_payload()] - return body - - def show(self, id): - """ Show a message with a given ID """ - inode, msgnum = map(int, id.split('.')) - olddir = os.getcwd() - os.chdir(DEVSHELL_DIR) - mboxfile = None - for dir in filter(isdir, os.listdir('.')): - for file in os.listdir(dir): - statinfo = os.stat(join(dir, file)) - if statinfo[stat.ST_INO] == inode: - mboxfile = join(dir, file) - break - if mboxfile: break - if mboxfile: - gzmbox = gzip.open(mboxfile) - mbox = UnixMailbox(gzmbox, email.message_from_file) - num = 0 - while num != msgnum: - msg = mbox.next() - num += 1 - self.__print_msg(msg) - else: - log.error("Cannot find message %s" % id) - os.chdir(olddir) - - def __print_msg(self, msg): - log.info("From: %s" % msg['From']) - log.info("To: %s" % msg['To']) - log.info("Subject: %s" % msg['Subject']) - log.info('\n'.join(self.__body(msg))) - - -class SCM: - """ - Our generic SCM wrapper. Based on the name_scm of the repos dictionary key, - we should be able to get our hands on any up-to-date code, and easily drop - into any file, make modifications, and commit upstream. - """ - def __init__(self, name, url): - self.url = url - self.name = name - self.type = name.split('_')[-1] - if self.type not in ('cvs', 'git', 'bzr', 'hg'): - log.error("Unknown SCM '%s' for repo %s" % (self.type, name)) - return - - def co(self, module, branch=''): - self.module = module - self.branch = branch - if os.path.isdir(os.path.join(FEDORA_DIR, module)): - # from here, we have no idea what scm we're dealing with - log.debug("%s module already checked out!" % module) - return os.path.join(FEDORA_DIR, module, branch) - path = os.getcwd() - os.chdir(FEDORA_DIR) - cmd = "%s checkout %s" % (self.type, module) - status, output = commands.getstatusoutput(cmd) - log.debug("Ran `%s`\nstatus = %s\noutput= %s" % (cmd, status, output)) - os.chdir(path) - return status == 0 - - def patches(self): - path = os.getcwd() - os.chdir(os.path.join(FEDORA_DIR, self.module, self.branch)) - i = 0 - for patch in glob('*.patch'): - log.info(" [ %d ] %s" % (i, patch)) - i += 1 - os.chdir(path) - - def log(self, file=None): - path = os.getcwd() - os.chdir(os.path.join(FEDORA_DIR, self.module, self.branch)) - if not file: - os.system("%s log | less" % self.type) - else: - os.system("%s log %s | less" % (self.type, file)) - os.chdir(path) - - def update(self): - path = os.getcwd() - os.chdir(FEDORA_DIR) - cmd = "%s update" % self.type - status, output = commands.getstatusoutput(cmd) - os.chdir(path) - return status == 0 - -class Pkg(Module): - - def __init__(self, name, branch='devel', scm=None): - """ - Open a given project - """ - log.debug("Pkg(%s, %s, %s)" % (name, branch, scm)) - global stack, prompt - self.name = name - self.branch = branch - self.scm = scm - stack += [self] - prompt += [name, branch] - self.__find_scm() - - def __find_scm(self): - global repos - if self.scm: - if repos.has_key(self.scm): - self.scm = SCM(scm, repos[scm]) - else: - log.debug("TODO: use specified SCM") - else: - # Try and find our corresponding SCM.. trial-and-error style. - for scm in [SCM(repo, url) for repo, url in repos.items()]: - log.debug("checking with SCM %s" % scm) - if scm.co(self.name, self.branch): - self.scm = scm - log.debug("Found relevant scm: %s" % scm) - break - if not self.scm: - log.error("Cannot find corresponding SCM for %s" % self.name) - - def spec(self): - """ - View the RPM spec file for this project - """ - editor = os.getenv('EDITOR', 'vim') - os.system("%s %s/%s.spec" % (editor, os.path.join(FEDORA_DIR, self.name, - self.branch), self.name)) - - def patches(self): - """ - List all patches applied against this package - """ - self.scm.patches() - - def log(self, item=None): - """ - Show the log - """ - self.scm.log(item) - - def patch(self, num): - self.scm.patch(num) - - def diff(self): - raise NotImplementedError - - def prep(self): - raise NotImplementedError - - def build(self): - raise NotImplementedError - - def srpm(self): - raise NotImplementedError - - def qa(self): - raise NotImplementedError - - def bugs(self): - raise NotImplementedError - -class Bugs(Module): - """ - Interface for doing useful things with Bugs. - TODO: add support for arbitrary bug trackers! - python-bugzilla integration! - """ - def view(self, data): - """ - View a given bug number, or show all bugs for a given component - Example: bugs view #1234, bugs view nethack - """ - if data[0] == '#': - os.system('firefox "https://bugzilla.redhat.com/bugzilla/show_bug.cgi?id=%s"' % data[1:]) - else: - os.system('firefox "https://bugzilla.redhat.com/bugzilla/buglist.cgi?product=Fedora+Core&component=%s&bug_status=NEW&bug_status=ASSIGNED&bug_status=REOPENED&bug_status=MODIFIED&short_desc_type=allwordssubstr&short_desc=&long_desc_type=allwordssubstr&long_desc="' % data) - - def search(self, text): - """ - Search bugzilla for a given keyword - """ - os.system('firefox "https://bugzilla.redhat.com/buglist.cgi?query_format=specific&order=bugs.bug_id&bug_status=__open__&product=&content=%s"' % text) + """ Our parent class for all command modules """ + pass def load_modules(): global modules from inspect import isclass log.debug("Loading modules") - for key in globals().keys(): - module = globals()[key] - if isclass(module): - if issubclass(module, Module) and key != 'Module': - # Cache reference to class, *not* an instance. We'll instantiate - # modules on the fly when we need them - log.debug(" * %s" % key) - modules[key.lower()] = module + for f in os.listdir(os.path.abspath('modules')): + module_name, ext = os.path.splitext(f) + if ext == '.py': + exec "from modules import %s as module" % module_name + for item in dir(module): + obj = getattr(module, item) + if item[0] != '_' and isclass(obj): + modules[item.lower()] = obj + log.info(" * %s" % item) + del module def print_docstrings(module=None): """ @@ -359,86 +101,127 @@ def shell(): except (EOFError, KeyboardInterrupt): print break - if not data: continue if data in ('quit', 'exit'): break keyword = data.split()[0] - data = data.split()[1:] - + args = data.split()[1:] + # Show the docstrings to all methods for all loaded modules if keyword == 'help': print_docstrings() - + # Go up a module in our stack - elif keyword == 'up': + elif keyword in ('up', 'cd..', 'cd ..'): stack = stack[:-1] prompt = prompt[:-1] - + # Show the docstrings for all methods in our current module - elif keyword in ('ls', 'help'): + elif keyword in ('ls', 'help', '?'): if len(stack): print_docstrings(stack[-1]) else: print_docstrings() - + # Flush our module stack elif keyword == 'cd': stack = [] prompt = ['\033[34;1mfedora\033[0m'] - + # iPython. Never leave home without it. elif keyword in ('py', 'python'): os.system("ipython -noconfirm_exit") - - # Do some intelligent handling of unknown input. - # For the given input 'foo bar', we first check if we have the - # module 'foo' loaded, then we push it on the top of our module - # stack and check if it has the 'bar' attribute. If so, we call - # it with any remaining arguments else: - args = [] - top = None - - # See if our keyword is a global module - if keyword in modules.keys(): - log.debug("%s is a module!" % keyword) - #top = modules[keyword] - if len(data) and not hasattr(modules[keyword], data[0]): - log.debug("Loading %s.__init__(%s)" % (keyword, data)) - top = modules[keyword](*data) - #stack += [top] - #prompt += [keyword] + data - continue - else: - log.debug("Loading %s" % keyword) - try: - top = modules[keyword]() - stack += [top] - prompt += [keyword] - except TypeError, e: - log.error("%s.%s" % (keyword, e)) - continue - - # Try the top module on our stack if len(stack): - if hasattr(stack[-1], keyword): - log.debug("Current module has %s" % keyword) - top = getattr(stack[-1], keyword) - - # Iterate over the rest of our arguments, either going deeper - # into the top module, or appending tokens to our args list. - for value in data: - if hasattr(top, value): - log.debug("next property %s found in %s" % (value, top)) - top = getattr(top, value) - elif top: - log.debug("adding argument: %s" % value) - args.append(value) - if top: - log.debug("calling %s with %s" % (top, args)) - top(*args) + top = stack[-1] + else: + top = None + output, top, params = do_command(data.split(), top) + stack += [top] + prompt += [top.__class__.__name__] + params + +class ModuleError(Exception): + def __init__(self, reason, error): + self.reason = reason + self.error = error + +def load_module(name, data=[]): +# stack = [] +# prompt = [] + top = None + try: + log.debug("Loading %s.__init__(%s)" % (name, data)) + top = modules[name](*data) +# stack += [top] +# prompt += [name] + data + except TypeError, e: + log.debug('got type error') + log.error("%s: %s" % (name, e)) + raise ModuleError('probably bad call to __init__' , e) + return top + + +def do_command(data, top=None): + + # Do some intelligent handling of unknown input. + # For the given input 'foo bar', we first check if we have the + # module 'foo' loaded, then we push it on the top of our module + # stack and check if it has the 'bar' attribute. If so, we call + # it with any remaining arguments + from collections import deque + data = deque(data) + log.debug(data) + params = [] + + module = None + while len(data): + if not top: + log.debug('not top') + mod = data.popleft() + try: + module = modules[mod] + except KeyError, e: + log.error('%s is not a known module' % e.message) + return None, None, None + log.debug('mod is %s' % mod) + params = [] + while len(data): + log.debug('params so far %s' % params) + log.debug('inner loop %s' % data) + param = data.popleft() + if hasattr(module, param): + data.appendleft(param) + break + params += [param] + try: + top = module = load_module(mod, params) + mod_params = params + loaded_module = True + except ModuleError, e: + log.debug(e.reason) + break + else: + log.debug('is top') + params = [] + log.debug('params so far %s' % params) + param = data.popleft() + if hasattr(top, param): + log.debug("next property %s found in %s" % (param, top)) + top = getattr(top, param) + else: + log.debug("adding argument: %s" % param) + params += [param] + output = None + if len(params): + output = top(*params) + + if module: + return output, module, mod_params + else: + return output, None, None + def main(): + global log from optparse import OptionParser parser = OptionParser(usage="%prog [options]", version="%s %s" % ('%prog', __version__), @@ -466,7 +249,6 @@ def main(): load_modules() shell() - - + if __name__ == '__main__': - main() + main()
\ No newline at end of file |
