summaryrefslogtreecommitdiffstats
path: root/devshell.py
diff options
context:
space:
mode:
authorlmacken@crow <>2007-11-25 22:13:25 -0500
committerYaakov M. Nemoy <loupgaroublond@gmail.com>2007-11-25 22:13:25 -0500
commit40119a669a625b839503d91aa5d40e112f7e46e2 (patch)
tree63f1e782a37311bebdc49bb2d5c03efca7927f7f /devshell.py
downloadfedora-devshell-40119a669a625b839503d91aa5d40e112f7e46e2.tar.gz
fedora-devshell-40119a669a625b839503d91aa5d40e112f7e46e2.tar.xz
fedora-devshell-40119a669a625b839503d91aa5d40e112f7e46e2.zip
initial commit
Diffstat (limited to 'devshell.py')
-rwxr-xr-xdevshell.py472
1 files changed, 472 insertions, 0 deletions
diff --git a/devshell.py b/devshell.py
new file mode 100755
index 0000000..119692b
--- /dev/null
+++ b/devshell.py
@@ -0,0 +1,472 @@
+#!/usr/bin/python -tt
+# $Id: $
+# Fedora Developer Shell
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Library General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+# Authors: Luke Macken <lmacken@redhat.com>
+#
+# TODO:
+# - query pkgdb
+# - view all branch versions, and easily move versions between branches
+# - List available bugzillas.. give the ability to view/control all bugs of theirs or for any component in any of the bugzillas
+
+# v0.1 release
+# - fedora-qa!!!!
+# - push updates to bodhi
+# - koji building/querying
+# - query versions of any package, drop into its code, view patches
+# - Ability to grep mailing lists and even IRC (?!)
+# - readline support@!
+# - audit <project> (flawfinder/rats/etc)
+
+import os
+import re
+import stat
+import gzip
+import email
+import urllib2
+import logging
+import commands
+
+from glob import glob
+from os.path import expanduser, exists, join, dirname, isdir
+from mailbox import UnixMailbox
+from datetime import datetime, timedelta
+
+__version__ = '0.0.1'
+__description__ = 'A shell for hacking on the Fedora project'
+
+FEDORA_DIR = join(expanduser('~'), 'code', 'fedora')
+DEVSHELL_DIR = join(expanduser('~'), '.devshell')
+
+repos = {
+ 'fedora_cvs' : ':ext:cvs.fedora.redhat.com:/cvs/pkgs',
+ 'bodhi_hg' : 'http://hg.fedoraproject.org/hg/hosted/bodhi',
+}
+
+stack = []
+prompt = ['\033[34;1mfedora\033[0m']
+modules = {}
+header = lambda x: "%s %s %s" % ('=' * 2, x, '=' * (76 - len(x)))
+log = logging.getLogger(__name__)
+
+class Module:
+ """ The parent class of all modules """
+ def __init__(self):
+ # setup stack and prompt
+ pass
+
+class Mail(Module):
+
+ url = 'https://www.redhat.com/archives/%s/%s'
+
+ def search(self, mailinglist, text):
+ """ <list> <text>. Search specific mailing list for given text """
+ now = datetime.now()
+ while True:
+ fetch = True
+ filename = now.strftime("%Y-%B") + '.txt.gz'
+ try:
+ f = urllib2.urlopen(self.url % (mailinglist, filename))
+ except urllib2.HTTPError:
+ break
+ local = join(DEVSHELL_DIR, mailinglist, filename)
+
+ # Don't fetch the mbox if we already have a good local copy
+ if exists(local):
+ info = os.stat(local)
+ if info[stat.ST_SIZE] == int(f.headers.get('Content-Length')):
+ fetch = False
+ log.debug("Using local mbox: %s" % local)
+ if fetch:
+ if not exists(dirname(local)):
+ os.makedirs(dirname(local))
+ mbox = file(local, 'w')
+ log.debug("Downloading %s" % local)
+ mbox.write(f.read())
+ mbox.close()
+ f.close()
+
+ self.__search_mbox(local, text)
+
+ # Go back in time
+ now = now - timedelta(days=31)
+
+ def __search_mbox(self, mboxfile, text):
+ """
+ Search a compressed mbox for any specified keywords
+ """
+ num = 0
+ statinfo = os.stat(mboxfile)
+ gzmbox = gzip.open(mboxfile)
+ mbox = UnixMailbox(gzmbox, email.message_from_file)
+ while True:
+ msg = mbox.next()
+ if not msg: break
+ num += 1
+ fields = [msg['From'], msg['Subject']] + self.__body(msg)
+ for field in fields:
+ if re.search(text, field, re.IGNORECASE):
+ id = "%s.%s" % (statinfo[stat.ST_INO], num)
+ print "[%s] %s %s" % (id, msg['From'], msg['Subject'])
+ break
+ gzmbox.close()
+
+ def __body(self, msg):
+ """
+ Recursively gather multipart message bodies
+ """
+ body = []
+ if msg.is_multipart():
+ for payload in msg.get_payload():
+ for part in payload.walk():
+ body += self.__body(part)
+ else:
+ body = [msg.get_payload()]
+ return body
+
+ def show(self, id):
+ """ Show a message with a given ID """
+ inode, msgnum = map(int, id.split('.'))
+ olddir = os.getcwd()
+ os.chdir(DEVSHELL_DIR)
+ mboxfile = None
+ for dir in filter(isdir, os.listdir('.')):
+ for file in os.listdir(dir):
+ statinfo = os.stat(join(dir, file))
+ if statinfo[stat.ST_INO] == inode:
+ mboxfile = join(dir, file)
+ break
+ if mboxfile: break
+ if mboxfile:
+ gzmbox = gzip.open(mboxfile)
+ mbox = UnixMailbox(gzmbox, email.message_from_file)
+ num = 0
+ while num != msgnum:
+ msg = mbox.next()
+ num += 1
+ self.__print_msg(msg)
+ else:
+ log.error("Cannot find message %s" % id)
+ os.chdir(olddir)
+
+ def __print_msg(self, msg):
+ log.info("From: %s" % msg['From'])
+ log.info("To: %s" % msg['To'])
+ log.info("Subject: %s" % msg['Subject'])
+ log.info('\n'.join(self.__body(msg)))
+
+
+class SCM:
+ """
+ Our generic SCM wrapper. Based on the name_scm of the repos dictionary key,
+ we should be able to get our hands on any up-to-date code, and easily drop
+ into any file, make modifications, and commit upstream.
+ """
+ def __init__(self, name, url):
+ self.url = url
+ self.name = name
+ self.type = name.split('_')[-1]
+ if self.type not in ('cvs', 'git', 'bzr', 'hg'):
+ log.error("Unknown SCM '%s' for repo %s" % (self.type, name))
+ return
+
+ def co(self, module, branch=''):
+ self.module = module
+ self.branch = branch
+ if os.path.isdir(os.path.join(FEDORA_DIR, module)):
+ # from here, we have no idea what scm we're dealing with
+ log.debug("%s module already checked out!" % module)
+ return os.path.join(FEDORA_DIR, module, branch)
+ path = os.getcwd()
+ os.chdir(FEDORA_DIR)
+ cmd = "%s checkout %s" % (self.type, module)
+ status, output = commands.getstatusoutput(cmd)
+ log.debug("Ran `%s`\nstatus = %s\noutput= %s" % (cmd, status, output))
+ os.chdir(path)
+ return status == 0
+
+ def patches(self):
+ path = os.getcwd()
+ os.chdir(os.path.join(FEDORA_DIR, self.module, self.branch))
+ i = 0
+ for patch in glob('*.patch'):
+ log.info(" [ %d ] %s" % (i, patch))
+ i += 1
+ os.chdir(path)
+
+ def log(self, file=None):
+ path = os.getcwd()
+ os.chdir(os.path.join(FEDORA_DIR, self.module, self.branch))
+ if not file:
+ os.system("%s log | less" % self.type)
+ else:
+ os.system("%s log %s | less" % (self.type, file))
+ os.chdir(path)
+
+ def update(self):
+ path = os.getcwd()
+ os.chdir(FEDORA_DIR)
+ cmd = "%s update" % self.type
+ status, output = commands.getstatusoutput(cmd)
+ os.chdir(path)
+ return status == 0
+
+class Pkg(Module):
+
+ def __init__(self, name, branch='devel', scm=None):
+ """
+ Open a given project
+ """
+ log.debug("Pkg(%s, %s, %s)" % (name, branch, scm))
+ global stack, prompt
+ self.name = name
+ self.branch = branch
+ self.scm = scm
+ stack += [self]
+ prompt += [name, branch]
+ self.__find_scm()
+
+ def __find_scm(self):
+ global repos
+ if self.scm:
+ if repos.has_key(self.scm):
+ self.scm = SCM(scm, repos[scm])
+ else:
+ log.debug("TODO: use specified SCM")
+ else:
+ # Try and find our corresponding SCM.. trial-and-error style.
+ for scm in [SCM(repo, url) for repo, url in repos.items()]:
+ log.debug("checking with SCM %s" % scm)
+ if scm.co(self.name, self.branch):
+ self.scm = scm
+ log.debug("Found relevant scm: %s" % scm)
+ break
+ if not self.scm:
+ log.error("Cannot find corresponding SCM for %s" % self.name)
+
+ def spec(self):
+ """
+ View the RPM spec file for this project
+ """
+ editor = os.getenv('EDITOR', 'vim')
+ os.system("%s %s/%s.spec" % (editor, os.path.join(FEDORA_DIR, self.name,
+ self.branch), self.name))
+
+ def patches(self):
+ """
+ List all patches applied against this package
+ """
+ self.scm.patches()
+
+ def log(self, item=None):
+ """
+ Show the log
+ """
+ self.scm.log(item)
+
+ def patch(self, num):
+ self.scm.patch(num)
+
+ def diff(self):
+ raise NotImplementedError
+
+ def prep(self):
+ raise NotImplementedError
+
+ def build(self):
+ raise NotImplementedError
+
+ def srpm(self):
+ raise NotImplementedError
+
+ def qa(self):
+ raise NotImplementedError
+
+ def bugs(self):
+ raise NotImplementedError
+
+class Bugs(Module):
+ """
+ Interface for doing useful things with Bugs.
+ TODO: add support for arbitrary bug trackers!
+ python-bugzilla integration!
+ """
+ def view(self, data):
+ """
+ View a given bug number, or show all bugs for a given component
+ Example: bugs view #1234, bugs view nethack
+ """
+ if data[0] == '#':
+ os.system('firefox "https://bugzilla.redhat.com/bugzilla/show_bug.cgi?id=%s"' % data[1:])
+ else:
+ os.system('firefox "https://bugzilla.redhat.com/bugzilla/buglist.cgi?product=Fedora+Core&component=%s&bug_status=NEW&bug_status=ASSIGNED&bug_status=REOPENED&bug_status=MODIFIED&short_desc_type=allwordssubstr&short_desc=&long_desc_type=allwordssubstr&long_desc="' % data)
+
+ def search(self, text):
+ """
+ Search bugzilla for a given keyword
+ """
+ os.system('firefox "https://bugzilla.redhat.com/buglist.cgi?query_format=specific&order=bugs.bug_id&bug_status=__open__&product=&content=%s"' % text)
+
+def load_modules():
+ global modules
+ from inspect import isclass
+ log.debug("Loading modules")
+ for key in globals().keys():
+ module = globals()[key]
+ if isclass(module):
+ if issubclass(module, Module) and key != 'Module':
+ # Cache reference to class, *not* an instance. We'll instantiate
+ # modules on the fly when we need them
+ log.debug(" * %s" % key)
+ modules[key.lower()] = module
+
+def print_docstrings(module=None):
+ """
+ Print out the docstrings for all methods in the specified module.
+ If no module is specified, then display docstrings for all modules.
+ """
+ def _print_docstrings(name, module):
+ log.info(header(name))
+ for prop in filter(lambda x: x[0] != '_', dir(module)):
+ if callable(getattr(module, prop)) and hasattr(module, '__doc__') \
+ and getattr(module, prop).__doc__:
+ log.info(" |- [%s] %s" % (prop,
+ getattr(module, prop).__doc__.strip()))
+ if not module:
+ for name, module in modules.items():
+ _print_docstrings(name, module)
+ else:
+ _print_docstrings(str(module.__class__).split('.')[-1], module)
+
+def shell():
+ global stack, prompt
+ while True:
+ try:
+ data = raw_input('/'.join(prompt) + '> ')
+ except (EOFError, KeyboardInterrupt):
+ print
+ break
+
+ if not data: continue
+ if data in ('quit', 'exit'): break
+ keyword = data.split()[0]
+ data = data.split()[1:]
+
+ # Show the docstrings to all methods for all loaded modules
+ if keyword == 'help':
+ print_docstrings()
+
+ # Go up a module in our stack
+ elif keyword == 'up':
+ stack = stack[:-1]
+ prompt = prompt[:-1]
+
+ # Show the docstrings for all methods in our current module
+ elif keyword in ('ls', 'help'):
+ if len(stack):
+ print_docstrings(stack[-1])
+ else:
+ print_docstrings()
+
+ # Flush our module stack
+ elif keyword == 'cd':
+ stack = []
+ prompt = ['\033[34;1mfedora\033[0m']
+
+ # iPython. Never leave home without it.
+ elif keyword in ('py', 'python'):
+ os.system("ipython -noconfirm_exit")
+
+ # Do some intelligent handling of unknown input.
+ # For the given input 'foo bar', we first check if we have the
+ # module 'foo' loaded, then we push it on the top of our module
+ # stack and check if it has the 'bar' attribute. If so, we call
+ # it with any remaining arguments
+ else:
+ args = []
+ top = None
+
+ # See if our keyword is a global module
+ if keyword in modules.keys():
+ log.debug("%s is a module!" % keyword)
+ #top = modules[keyword]
+ if len(data) and not hasattr(modules[keyword], data[0]):
+ log.debug("Loading %s.__init__(%s)" % (keyword, data))
+ top = modules[keyword](*data)
+ #stack += [top]
+ #prompt += [keyword] + data
+ continue
+ else:
+ log.debug("Loading %s" % keyword)
+ try:
+ top = modules[keyword]()
+ stack += [top]
+ prompt += [keyword]
+ except TypeError, e:
+ log.error("%s.%s" % (keyword, e))
+ continue
+
+ # Try the top module on our stack
+ if len(stack):
+ if hasattr(stack[-1], keyword):
+ log.debug("Current module has %s" % keyword)
+ top = getattr(stack[-1], keyword)
+
+ # Iterate over the rest of our arguments, either going deeper
+ # into the top module, or appending tokens to our args list.
+ for value in data:
+ if hasattr(top, value):
+ log.debug("next property %s found in %s" % (value, top))
+ top = getattr(top, value)
+ elif top:
+ log.debug("adding argument: %s" % value)
+ args.append(value)
+ if top:
+ log.debug("calling %s with %s" % (top, args))
+ top(*args)
+
+def main():
+ from optparse import OptionParser
+ parser = OptionParser(usage="%prog [options]",
+ version="%s %s" % ('%prog', __version__),
+ description=__description__)
+ parser.add_option('-v', '--verbose', action='store_true', dest='verbose',
+ help='Display verbose debugging output')
+ (opts, args) = parser.parse_args()
+
+ # Setup our logger
+ sh = logging.StreamHandler()
+ if opts.verbose:
+ log.setLevel(logging.DEBUG)
+ sh.setLevel(logging.DEBUG)
+ format = logging.Formatter("[%(levelname)s] %(message)s")
+ else:
+ log.setLevel(logging.INFO)
+ sh.setLevel(logging.INFO)
+ format = logging.Formatter("%(message)s")
+ sh.setFormatter(format)
+ log.addHandler(sh)
+
+ if not os.path.isdir(FEDORA_DIR):
+ log.info("Creating %s" % FEDORA_DIR)
+ os.makedirs(FEDORA_DIR)
+
+ load_modules()
+ shell()
+
+
+if __name__ == '__main__':
+ main()