summaryrefslogtreecommitdiffstats
path: root/devshell.py
diff options
context:
space:
mode:
Diffstat (limited to 'devshell.py')
-rwxr-xr-xdevshell.py438
1 files changed, 110 insertions, 328 deletions
diff --git a/devshell.py b/devshell.py
index 119692b..0518cc7 100755
--- a/devshell.py
+++ b/devshell.py
@@ -1,5 +1,4 @@
#!/usr/bin/python -tt
-# $Id: $
# Fedora Developer Shell
#
# This program is free software; you can redistribute it and/or modify
@@ -51,11 +50,6 @@ __description__ = 'A shell for hacking on the Fedora project'
FEDORA_DIR = join(expanduser('~'), 'code', 'fedora')
DEVSHELL_DIR = join(expanduser('~'), '.devshell')
-repos = {
- 'fedora_cvs' : ':ext:cvs.fedora.redhat.com:/cvs/pkgs',
- 'bodhi_hg' : 'http://hg.fedoraproject.org/hg/hosted/bodhi',
-}
-
stack = []
prompt = ['\033[34;1mfedora\033[0m']
modules = {}
@@ -63,275 +57,23 @@ header = lambda x: "%s %s %s" % ('=' * 2, x, '=' * (76 - len(x)))
log = logging.getLogger(__name__)
class Module:
- """ The parent class of all modules """
- def __init__(self):
- # setup stack and prompt
- pass
-
-class Mail(Module):
-
- url = 'https://www.redhat.com/archives/%s/%s'
-
- def search(self, mailinglist, text):
- """ <list> <text>. Search specific mailing list for given text """
- now = datetime.now()
- while True:
- fetch = True
- filename = now.strftime("%Y-%B") + '.txt.gz'
- try:
- f = urllib2.urlopen(self.url % (mailinglist, filename))
- except urllib2.HTTPError:
- break
- local = join(DEVSHELL_DIR, mailinglist, filename)
-
- # Don't fetch the mbox if we already have a good local copy
- if exists(local):
- info = os.stat(local)
- if info[stat.ST_SIZE] == int(f.headers.get('Content-Length')):
- fetch = False
- log.debug("Using local mbox: %s" % local)
- if fetch:
- if not exists(dirname(local)):
- os.makedirs(dirname(local))
- mbox = file(local, 'w')
- log.debug("Downloading %s" % local)
- mbox.write(f.read())
- mbox.close()
- f.close()
-
- self.__search_mbox(local, text)
-
- # Go back in time
- now = now - timedelta(days=31)
-
- def __search_mbox(self, mboxfile, text):
- """
- Search a compressed mbox for any specified keywords
- """
- num = 0
- statinfo = os.stat(mboxfile)
- gzmbox = gzip.open(mboxfile)
- mbox = UnixMailbox(gzmbox, email.message_from_file)
- while True:
- msg = mbox.next()
- if not msg: break
- num += 1
- fields = [msg['From'], msg['Subject']] + self.__body(msg)
- for field in fields:
- if re.search(text, field, re.IGNORECASE):
- id = "%s.%s" % (statinfo[stat.ST_INO], num)
- print "[%s] %s %s" % (id, msg['From'], msg['Subject'])
- break
- gzmbox.close()
-
- def __body(self, msg):
- """
- Recursively gather multipart message bodies
- """
- body = []
- if msg.is_multipart():
- for payload in msg.get_payload():
- for part in payload.walk():
- body += self.__body(part)
- else:
- body = [msg.get_payload()]
- return body
-
- def show(self, id):
- """ Show a message with a given ID """
- inode, msgnum = map(int, id.split('.'))
- olddir = os.getcwd()
- os.chdir(DEVSHELL_DIR)
- mboxfile = None
- for dir in filter(isdir, os.listdir('.')):
- for file in os.listdir(dir):
- statinfo = os.stat(join(dir, file))
- if statinfo[stat.ST_INO] == inode:
- mboxfile = join(dir, file)
- break
- if mboxfile: break
- if mboxfile:
- gzmbox = gzip.open(mboxfile)
- mbox = UnixMailbox(gzmbox, email.message_from_file)
- num = 0
- while num != msgnum:
- msg = mbox.next()
- num += 1
- self.__print_msg(msg)
- else:
- log.error("Cannot find message %s" % id)
- os.chdir(olddir)
-
- def __print_msg(self, msg):
- log.info("From: %s" % msg['From'])
- log.info("To: %s" % msg['To'])
- log.info("Subject: %s" % msg['Subject'])
- log.info('\n'.join(self.__body(msg)))
-
-
-class SCM:
- """
- Our generic SCM wrapper. Based on the name_scm of the repos dictionary key,
- we should be able to get our hands on any up-to-date code, and easily drop
- into any file, make modifications, and commit upstream.
- """
- def __init__(self, name, url):
- self.url = url
- self.name = name
- self.type = name.split('_')[-1]
- if self.type not in ('cvs', 'git', 'bzr', 'hg'):
- log.error("Unknown SCM '%s' for repo %s" % (self.type, name))
- return
-
- def co(self, module, branch=''):
- self.module = module
- self.branch = branch
- if os.path.isdir(os.path.join(FEDORA_DIR, module)):
- # from here, we have no idea what scm we're dealing with
- log.debug("%s module already checked out!" % module)
- return os.path.join(FEDORA_DIR, module, branch)
- path = os.getcwd()
- os.chdir(FEDORA_DIR)
- cmd = "%s checkout %s" % (self.type, module)
- status, output = commands.getstatusoutput(cmd)
- log.debug("Ran `%s`\nstatus = %s\noutput= %s" % (cmd, status, output))
- os.chdir(path)
- return status == 0
-
- def patches(self):
- path = os.getcwd()
- os.chdir(os.path.join(FEDORA_DIR, self.module, self.branch))
- i = 0
- for patch in glob('*.patch'):
- log.info(" [ %d ] %s" % (i, patch))
- i += 1
- os.chdir(path)
-
- def log(self, file=None):
- path = os.getcwd()
- os.chdir(os.path.join(FEDORA_DIR, self.module, self.branch))
- if not file:
- os.system("%s log | less" % self.type)
- else:
- os.system("%s log %s | less" % (self.type, file))
- os.chdir(path)
-
- def update(self):
- path = os.getcwd()
- os.chdir(FEDORA_DIR)
- cmd = "%s update" % self.type
- status, output = commands.getstatusoutput(cmd)
- os.chdir(path)
- return status == 0
-
-class Pkg(Module):
-
- def __init__(self, name, branch='devel', scm=None):
- """
- Open a given project
- """
- log.debug("Pkg(%s, %s, %s)" % (name, branch, scm))
- global stack, prompt
- self.name = name
- self.branch = branch
- self.scm = scm
- stack += [self]
- prompt += [name, branch]
- self.__find_scm()
-
- def __find_scm(self):
- global repos
- if self.scm:
- if repos.has_key(self.scm):
- self.scm = SCM(scm, repos[scm])
- else:
- log.debug("TODO: use specified SCM")
- else:
- # Try and find our corresponding SCM.. trial-and-error style.
- for scm in [SCM(repo, url) for repo, url in repos.items()]:
- log.debug("checking with SCM %s" % scm)
- if scm.co(self.name, self.branch):
- self.scm = scm
- log.debug("Found relevant scm: %s" % scm)
- break
- if not self.scm:
- log.error("Cannot find corresponding SCM for %s" % self.name)
-
- def spec(self):
- """
- View the RPM spec file for this project
- """
- editor = os.getenv('EDITOR', 'vim')
- os.system("%s %s/%s.spec" % (editor, os.path.join(FEDORA_DIR, self.name,
- self.branch), self.name))
-
- def patches(self):
- """
- List all patches applied against this package
- """
- self.scm.patches()
-
- def log(self, item=None):
- """
- Show the log
- """
- self.scm.log(item)
-
- def patch(self, num):
- self.scm.patch(num)
-
- def diff(self):
- raise NotImplementedError
-
- def prep(self):
- raise NotImplementedError
-
- def build(self):
- raise NotImplementedError
-
- def srpm(self):
- raise NotImplementedError
-
- def qa(self):
- raise NotImplementedError
-
- def bugs(self):
- raise NotImplementedError
-
-class Bugs(Module):
- """
- Interface for doing useful things with Bugs.
- TODO: add support for arbitrary bug trackers!
- python-bugzilla integration!
- """
- def view(self, data):
- """
- View a given bug number, or show all bugs for a given component
- Example: bugs view #1234, bugs view nethack
- """
- if data[0] == '#':
- os.system('firefox "https://bugzilla.redhat.com/bugzilla/show_bug.cgi?id=%s"' % data[1:])
- else:
- os.system('firefox "https://bugzilla.redhat.com/bugzilla/buglist.cgi?product=Fedora+Core&component=%s&bug_status=NEW&bug_status=ASSIGNED&bug_status=REOPENED&bug_status=MODIFIED&short_desc_type=allwordssubstr&short_desc=&long_desc_type=allwordssubstr&long_desc="' % data)
-
- def search(self, text):
- """
- Search bugzilla for a given keyword
- """
- os.system('firefox "https://bugzilla.redhat.com/buglist.cgi?query_format=specific&order=bugs.bug_id&bug_status=__open__&product=&content=%s"' % text)
+ """ Our parent class for all command modules """
+ pass
def load_modules():
global modules
from inspect import isclass
log.debug("Loading modules")
- for key in globals().keys():
- module = globals()[key]
- if isclass(module):
- if issubclass(module, Module) and key != 'Module':
- # Cache reference to class, *not* an instance. We'll instantiate
- # modules on the fly when we need them
- log.debug(" * %s" % key)
- modules[key.lower()] = module
+ for f in os.listdir(os.path.abspath('modules')):
+ module_name, ext = os.path.splitext(f)
+ if ext == '.py':
+ exec "from modules import %s as module" % module_name
+ for item in dir(module):
+ obj = getattr(module, item)
+ if item[0] != '_' and isclass(obj):
+ modules[item.lower()] = obj
+ log.info(" * %s" % item)
+ del module
def print_docstrings(module=None):
"""
@@ -359,86 +101,127 @@ def shell():
except (EOFError, KeyboardInterrupt):
print
break
-
if not data: continue
if data in ('quit', 'exit'): break
keyword = data.split()[0]
- data = data.split()[1:]
-
+ args = data.split()[1:]
+
# Show the docstrings to all methods for all loaded modules
if keyword == 'help':
print_docstrings()
-
+
# Go up a module in our stack
- elif keyword == 'up':
+ elif keyword in ('up', 'cd..', 'cd ..'):
stack = stack[:-1]
prompt = prompt[:-1]
-
+
# Show the docstrings for all methods in our current module
- elif keyword in ('ls', 'help'):
+ elif keyword in ('ls', 'help', '?'):
if len(stack):
print_docstrings(stack[-1])
else:
print_docstrings()
-
+
# Flush our module stack
elif keyword == 'cd':
stack = []
prompt = ['\033[34;1mfedora\033[0m']
-
+
# iPython. Never leave home without it.
elif keyword in ('py', 'python'):
os.system("ipython -noconfirm_exit")
-
- # Do some intelligent handling of unknown input.
- # For the given input 'foo bar', we first check if we have the
- # module 'foo' loaded, then we push it on the top of our module
- # stack and check if it has the 'bar' attribute. If so, we call
- # it with any remaining arguments
else:
- args = []
- top = None
-
- # See if our keyword is a global module
- if keyword in modules.keys():
- log.debug("%s is a module!" % keyword)
- #top = modules[keyword]
- if len(data) and not hasattr(modules[keyword], data[0]):
- log.debug("Loading %s.__init__(%s)" % (keyword, data))
- top = modules[keyword](*data)
- #stack += [top]
- #prompt += [keyword] + data
- continue
- else:
- log.debug("Loading %s" % keyword)
- try:
- top = modules[keyword]()
- stack += [top]
- prompt += [keyword]
- except TypeError, e:
- log.error("%s.%s" % (keyword, e))
- continue
-
- # Try the top module on our stack
if len(stack):
- if hasattr(stack[-1], keyword):
- log.debug("Current module has %s" % keyword)
- top = getattr(stack[-1], keyword)
-
- # Iterate over the rest of our arguments, either going deeper
- # into the top module, or appending tokens to our args list.
- for value in data:
- if hasattr(top, value):
- log.debug("next property %s found in %s" % (value, top))
- top = getattr(top, value)
- elif top:
- log.debug("adding argument: %s" % value)
- args.append(value)
- if top:
- log.debug("calling %s with %s" % (top, args))
- top(*args)
+ top = stack[-1]
+ else:
+ top = None
+ output, top, params = do_command(data.split(), top)
+ stack += [top]
+ prompt += [top.__class__.__name__] + params
+
+class ModuleError(Exception):
+ def __init__(self, reason, error):
+ self.reason = reason
+ self.error = error
+
+def load_module(name, data=[]):
+# stack = []
+# prompt = []
+ top = None
+ try:
+ log.debug("Loading %s.__init__(%s)" % (name, data))
+ top = modules[name](*data)
+# stack += [top]
+# prompt += [name] + data
+ except TypeError, e:
+ log.debug('got type error')
+ log.error("%s: %s" % (name, e))
+ raise ModuleError('probably bad call to __init__' , e)
+ return top
+
+
+def do_command(data, top=None):
+
+ # Do some intelligent handling of unknown input.
+ # For the given input 'foo bar', we first check if we have the
+ # module 'foo' loaded, then we push it on the top of our module
+ # stack and check if it has the 'bar' attribute. If so, we call
+ # it with any remaining arguments
+ from collections import deque
+ data = deque(data)
+ log.debug(data)
+ params = []
+
+ module = None
+ while len(data):
+ if not top:
+ log.debug('not top')
+ mod = data.popleft()
+ try:
+ module = modules[mod]
+ except KeyError, e:
+ log.error('%s is not a known module' % e.message)
+ return None, None, None
+ log.debug('mod is %s' % mod)
+ params = []
+ while len(data):
+ log.debug('params so far %s' % params)
+ log.debug('inner loop %s' % data)
+ param = data.popleft()
+ if hasattr(module, param):
+ data.appendleft(param)
+ break
+ params += [param]
+ try:
+ top = module = load_module(mod, params)
+ mod_params = params
+ loaded_module = True
+ except ModuleError, e:
+ log.debug(e.reason)
+ break
+ else:
+ log.debug('is top')
+ params = []
+ log.debug('params so far %s' % params)
+ param = data.popleft()
+ if hasattr(top, param):
+ log.debug("next property %s found in %s" % (param, top))
+ top = getattr(top, param)
+ else:
+ log.debug("adding argument: %s" % param)
+ params += [param]
+ output = None
+ if len(params):
+ output = top(*params)
+
+ if module:
+ return output, module, mod_params
+ else:
+ return output, None, None
+
def main():
+ global log
from optparse import OptionParser
parser = OptionParser(usage="%prog [options]",
version="%s %s" % ('%prog', __version__),
@@ -466,7 +249,6 @@ def main():
load_modules()
shell()
-
-
+
if __name__ == '__main__':
- main()
+ main() \ No newline at end of file