diff options
author | lmacken@crow <> | 2007-11-25 22:13:25 -0500 |
---|---|---|
committer | Yaakov M. Nemoy <loupgaroublond@gmail.com> | 2007-11-25 22:13:25 -0500 |
commit | 40119a669a625b839503d91aa5d40e112f7e46e2 (patch) | |
tree | 63f1e782a37311bebdc49bb2d5c03efca7927f7f /devshell.py | |
download | fedora-devshell-40119a669a625b839503d91aa5d40e112f7e46e2.tar.gz fedora-devshell-40119a669a625b839503d91aa5d40e112f7e46e2.tar.xz fedora-devshell-40119a669a625b839503d91aa5d40e112f7e46e2.zip |
initial commit
Diffstat (limited to 'devshell.py')
-rwxr-xr-x | devshell.py | 472 |
1 files changed, 472 insertions, 0 deletions
diff --git a/devshell.py b/devshell.py new file mode 100755 index 0000000..119692b --- /dev/null +++ b/devshell.py @@ -0,0 +1,472 @@ +#!/usr/bin/python -tt +# $Id: $ +# Fedora Developer Shell +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Library General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# +# Authors: Luke Macken <lmacken@redhat.com> +# +# TODO: +# - query pkgdb +# - view all branch versions, and easily move versions between branches +# - List available bugzillas.. give the ability to view/control all bugs of theirs or for any component in any of the bugzillas + +# v0.1 release +# - fedora-qa!!!! +# - push updates to bodhi +# - koji building/querying +# - query versions of any package, drop into its code, view patches +# - Ability to grep mailing lists and even IRC (?!) +# - readline support@! +# - audit <project> (flawfinder/rats/etc) + +import os +import re +import stat +import gzip +import email +import urllib2 +import logging +import commands + +from glob import glob +from os.path import expanduser, exists, join, dirname, isdir +from mailbox import UnixMailbox +from datetime import datetime, timedelta + +__version__ = '0.0.1' +__description__ = 'A shell for hacking on the Fedora project' + +FEDORA_DIR = join(expanduser('~'), 'code', 'fedora') +DEVSHELL_DIR = join(expanduser('~'), '.devshell') + +repos = { + 'fedora_cvs' : ':ext:cvs.fedora.redhat.com:/cvs/pkgs', + 'bodhi_hg' : 'http://hg.fedoraproject.org/hg/hosted/bodhi', +} + +stack = [] +prompt = ['\033[34;1mfedora\033[0m'] +modules = {} +header = lambda x: "%s %s %s" % ('=' * 2, x, '=' * (76 - len(x))) +log = logging.getLogger(__name__) + +class Module: + """ The parent class of all modules """ + def __init__(self): + # setup stack and prompt + pass + +class Mail(Module): + + url = 'https://www.redhat.com/archives/%s/%s' + + def search(self, mailinglist, text): + """ <list> <text>. Search specific mailing list for given text """ + now = datetime.now() + while True: + fetch = True + filename = now.strftime("%Y-%B") + '.txt.gz' + try: + f = urllib2.urlopen(self.url % (mailinglist, filename)) + except urllib2.HTTPError: + break + local = join(DEVSHELL_DIR, mailinglist, filename) + + # Don't fetch the mbox if we already have a good local copy + if exists(local): + info = os.stat(local) + if info[stat.ST_SIZE] == int(f.headers.get('Content-Length')): + fetch = False + log.debug("Using local mbox: %s" % local) + if fetch: + if not exists(dirname(local)): + os.makedirs(dirname(local)) + mbox = file(local, 'w') + log.debug("Downloading %s" % local) + mbox.write(f.read()) + mbox.close() + f.close() + + self.__search_mbox(local, text) + + # Go back in time + now = now - timedelta(days=31) + + def __search_mbox(self, mboxfile, text): + """ + Search a compressed mbox for any specified keywords + """ + num = 0 + statinfo = os.stat(mboxfile) + gzmbox = gzip.open(mboxfile) + mbox = UnixMailbox(gzmbox, email.message_from_file) + while True: + msg = mbox.next() + if not msg: break + num += 1 + fields = [msg['From'], msg['Subject']] + self.__body(msg) + for field in fields: + if re.search(text, field, re.IGNORECASE): + id = "%s.%s" % (statinfo[stat.ST_INO], num) + print "[%s] %s %s" % (id, msg['From'], msg['Subject']) + break + gzmbox.close() + + def __body(self, msg): + """ + Recursively gather multipart message bodies + """ + body = [] + if msg.is_multipart(): + for payload in msg.get_payload(): + for part in payload.walk(): + body += self.__body(part) + else: + body = [msg.get_payload()] + return body + + def show(self, id): + """ Show a message with a given ID """ + inode, msgnum = map(int, id.split('.')) + olddir = os.getcwd() + os.chdir(DEVSHELL_DIR) + mboxfile = None + for dir in filter(isdir, os.listdir('.')): + for file in os.listdir(dir): + statinfo = os.stat(join(dir, file)) + if statinfo[stat.ST_INO] == inode: + mboxfile = join(dir, file) + break + if mboxfile: break + if mboxfile: + gzmbox = gzip.open(mboxfile) + mbox = UnixMailbox(gzmbox, email.message_from_file) + num = 0 + while num != msgnum: + msg = mbox.next() + num += 1 + self.__print_msg(msg) + else: + log.error("Cannot find message %s" % id) + os.chdir(olddir) + + def __print_msg(self, msg): + log.info("From: %s" % msg['From']) + log.info("To: %s" % msg['To']) + log.info("Subject: %s" % msg['Subject']) + log.info('\n'.join(self.__body(msg))) + + +class SCM: + """ + Our generic SCM wrapper. Based on the name_scm of the repos dictionary key, + we should be able to get our hands on any up-to-date code, and easily drop + into any file, make modifications, and commit upstream. + """ + def __init__(self, name, url): + self.url = url + self.name = name + self.type = name.split('_')[-1] + if self.type not in ('cvs', 'git', 'bzr', 'hg'): + log.error("Unknown SCM '%s' for repo %s" % (self.type, name)) + return + + def co(self, module, branch=''): + self.module = module + self.branch = branch + if os.path.isdir(os.path.join(FEDORA_DIR, module)): + # from here, we have no idea what scm we're dealing with + log.debug("%s module already checked out!" % module) + return os.path.join(FEDORA_DIR, module, branch) + path = os.getcwd() + os.chdir(FEDORA_DIR) + cmd = "%s checkout %s" % (self.type, module) + status, output = commands.getstatusoutput(cmd) + log.debug("Ran `%s`\nstatus = %s\noutput= %s" % (cmd, status, output)) + os.chdir(path) + return status == 0 + + def patches(self): + path = os.getcwd() + os.chdir(os.path.join(FEDORA_DIR, self.module, self.branch)) + i = 0 + for patch in glob('*.patch'): + log.info(" [ %d ] %s" % (i, patch)) + i += 1 + os.chdir(path) + + def log(self, file=None): + path = os.getcwd() + os.chdir(os.path.join(FEDORA_DIR, self.module, self.branch)) + if not file: + os.system("%s log | less" % self.type) + else: + os.system("%s log %s | less" % (self.type, file)) + os.chdir(path) + + def update(self): + path = os.getcwd() + os.chdir(FEDORA_DIR) + cmd = "%s update" % self.type + status, output = commands.getstatusoutput(cmd) + os.chdir(path) + return status == 0 + +class Pkg(Module): + + def __init__(self, name, branch='devel', scm=None): + """ + Open a given project + """ + log.debug("Pkg(%s, %s, %s)" % (name, branch, scm)) + global stack, prompt + self.name = name + self.branch = branch + self.scm = scm + stack += [self] + prompt += [name, branch] + self.__find_scm() + + def __find_scm(self): + global repos + if self.scm: + if repos.has_key(self.scm): + self.scm = SCM(scm, repos[scm]) + else: + log.debug("TODO: use specified SCM") + else: + # Try and find our corresponding SCM.. trial-and-error style. + for scm in [SCM(repo, url) for repo, url in repos.items()]: + log.debug("checking with SCM %s" % scm) + if scm.co(self.name, self.branch): + self.scm = scm + log.debug("Found relevant scm: %s" % scm) + break + if not self.scm: + log.error("Cannot find corresponding SCM for %s" % self.name) + + def spec(self): + """ + View the RPM spec file for this project + """ + editor = os.getenv('EDITOR', 'vim') + os.system("%s %s/%s.spec" % (editor, os.path.join(FEDORA_DIR, self.name, + self.branch), self.name)) + + def patches(self): + """ + List all patches applied against this package + """ + self.scm.patches() + + def log(self, item=None): + """ + Show the log + """ + self.scm.log(item) + + def patch(self, num): + self.scm.patch(num) + + def diff(self): + raise NotImplementedError + + def prep(self): + raise NotImplementedError + + def build(self): + raise NotImplementedError + + def srpm(self): + raise NotImplementedError + + def qa(self): + raise NotImplementedError + + def bugs(self): + raise NotImplementedError + +class Bugs(Module): + """ + Interface for doing useful things with Bugs. + TODO: add support for arbitrary bug trackers! + python-bugzilla integration! + """ + def view(self, data): + """ + View a given bug number, or show all bugs for a given component + Example: bugs view #1234, bugs view nethack + """ + if data[0] == '#': + os.system('firefox "https://bugzilla.redhat.com/bugzilla/show_bug.cgi?id=%s"' % data[1:]) + else: + os.system('firefox "https://bugzilla.redhat.com/bugzilla/buglist.cgi?product=Fedora+Core&component=%s&bug_status=NEW&bug_status=ASSIGNED&bug_status=REOPENED&bug_status=MODIFIED&short_desc_type=allwordssubstr&short_desc=&long_desc_type=allwordssubstr&long_desc="' % data) + + def search(self, text): + """ + Search bugzilla for a given keyword + """ + os.system('firefox "https://bugzilla.redhat.com/buglist.cgi?query_format=specific&order=bugs.bug_id&bug_status=__open__&product=&content=%s"' % text) + +def load_modules(): + global modules + from inspect import isclass + log.debug("Loading modules") + for key in globals().keys(): + module = globals()[key] + if isclass(module): + if issubclass(module, Module) and key != 'Module': + # Cache reference to class, *not* an instance. We'll instantiate + # modules on the fly when we need them + log.debug(" * %s" % key) + modules[key.lower()] = module + +def print_docstrings(module=None): + """ + Print out the docstrings for all methods in the specified module. + If no module is specified, then display docstrings for all modules. + """ + def _print_docstrings(name, module): + log.info(header(name)) + for prop in filter(lambda x: x[0] != '_', dir(module)): + if callable(getattr(module, prop)) and hasattr(module, '__doc__') \ + and getattr(module, prop).__doc__: + log.info(" |- [%s] %s" % (prop, + getattr(module, prop).__doc__.strip())) + if not module: + for name, module in modules.items(): + _print_docstrings(name, module) + else: + _print_docstrings(str(module.__class__).split('.')[-1], module) + +def shell(): + global stack, prompt + while True: + try: + data = raw_input('/'.join(prompt) + '> ') + except (EOFError, KeyboardInterrupt): + print + break + + if not data: continue + if data in ('quit', 'exit'): break + keyword = data.split()[0] + data = data.split()[1:] + + # Show the docstrings to all methods for all loaded modules + if keyword == 'help': + print_docstrings() + + # Go up a module in our stack + elif keyword == 'up': + stack = stack[:-1] + prompt = prompt[:-1] + + # Show the docstrings for all methods in our current module + elif keyword in ('ls', 'help'): + if len(stack): + print_docstrings(stack[-1]) + else: + print_docstrings() + + # Flush our module stack + elif keyword == 'cd': + stack = [] + prompt = ['\033[34;1mfedora\033[0m'] + + # iPython. Never leave home without it. + elif keyword in ('py', 'python'): + os.system("ipython -noconfirm_exit") + + # Do some intelligent handling of unknown input. + # For the given input 'foo bar', we first check if we have the + # module 'foo' loaded, then we push it on the top of our module + # stack and check if it has the 'bar' attribute. If so, we call + # it with any remaining arguments + else: + args = [] + top = None + + # See if our keyword is a global module + if keyword in modules.keys(): + log.debug("%s is a module!" % keyword) + #top = modules[keyword] + if len(data) and not hasattr(modules[keyword], data[0]): + log.debug("Loading %s.__init__(%s)" % (keyword, data)) + top = modules[keyword](*data) + #stack += [top] + #prompt += [keyword] + data + continue + else: + log.debug("Loading %s" % keyword) + try: + top = modules[keyword]() + stack += [top] + prompt += [keyword] + except TypeError, e: + log.error("%s.%s" % (keyword, e)) + continue + + # Try the top module on our stack + if len(stack): + if hasattr(stack[-1], keyword): + log.debug("Current module has %s" % keyword) + top = getattr(stack[-1], keyword) + + # Iterate over the rest of our arguments, either going deeper + # into the top module, or appending tokens to our args list. + for value in data: + if hasattr(top, value): + log.debug("next property %s found in %s" % (value, top)) + top = getattr(top, value) + elif top: + log.debug("adding argument: %s" % value) + args.append(value) + if top: + log.debug("calling %s with %s" % (top, args)) + top(*args) + +def main(): + from optparse import OptionParser + parser = OptionParser(usage="%prog [options]", + version="%s %s" % ('%prog', __version__), + description=__description__) + parser.add_option('-v', '--verbose', action='store_true', dest='verbose', + help='Display verbose debugging output') + (opts, args) = parser.parse_args() + + # Setup our logger + sh = logging.StreamHandler() + if opts.verbose: + log.setLevel(logging.DEBUG) + sh.setLevel(logging.DEBUG) + format = logging.Formatter("[%(levelname)s] %(message)s") + else: + log.setLevel(logging.INFO) + sh.setLevel(logging.INFO) + format = logging.Formatter("%(message)s") + sh.setFormatter(format) + log.addHandler(sh) + + if not os.path.isdir(FEDORA_DIR): + log.info("Creating %s" % FEDORA_DIR) + os.makedirs(FEDORA_DIR) + + load_modules() + shell() + + +if __name__ == '__main__': + main() |