path: root/git_taskrepo
diff options
authorBill Peck <>2015-05-05 14:12:12 -0400
committerBill Peck <>2015-05-05 14:12:12 -0400
commita86d5c0a2815ce2bc288a76ff6edc103ff8eb3a5 (patch)
treec49e3624b55ee96db6a79fb8fb6edf79637eed43 /git_taskrepo
initial commit
Diffstat (limited to 'git_taskrepo')
9 files changed, 1828 insertions, 0 deletions
diff --git a/git_taskrepo/ b/git_taskrepo/
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/git_taskrepo/
diff --git a/git_taskrepo/ b/git_taskrepo/
new file mode 100644
index 0000000..9ab2af9
--- /dev/null
+++ b/git_taskrepo/
@@ -0,0 +1,394 @@
+# -*- coding: utf-8 -*-
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+# This is a copy and paste of various parts of kobo 0.4.2-1 needed to support the
+# creation of kobo commands
+import sys
+import os
+import optparse
+from optparse import Option
+import sqlite3 as lite
+from git import Repo
+def username_prompt(prompt=None, default_value=None):
+ """Ask for a username."""
+ if default_value is not None:
+ return default_value
+ prompt = prompt or "Enter your username: "
+ print >>sys.stderr, prompt,
+ return sys.stdin.readline()
+def password_prompt(prompt=None, default_value=None):
+ """Ask for a password."""
+ import getpass
+ if default_value is not None:
+ return default_value
+ prompt = prompt or "Enter your password: "
+ try:
+ # try to use stderr stream
+ result = getpass.getpass(prompt, stream=sys.stderr)
+ except TypeError:
+ # fall back to stdout
+ result = getpass.getpass(prompt)
+ return result
+def yes_no_prompt(prompt, default_value=None):
+ """Give a yes/no (y/n) question."""
+ if default_value is not None:
+ if default_value not in ("Y", "N"):
+ raise ValueError("Invalid default value: %s" % default_value)
+ default_value = default_value.upper()
+ prompt = "%s [%s/%s]: " % (prompt, ("y", "Y")[default_value == "Y"], ("n", "N")[default_value == "N"])
+ print >>sys.stderr, prompt,
+ while True:
+ user_input = sys.stdin.readline().strip().upper()
+ if user_input == "" and default_value is not None:
+ user_input = default_value
+ if user_input == "Y":
+ return True
+ if user_input == "N":
+ return False
+def are_you_sure_prompt(prompt=None):
+ """Give a yes/no (y/n) question."""
+ prompt = prompt or "Are you sure? Enter 'YES' to continue: "
+ print >>sys.stderr, prompt,
+ user_input = sys.stdin.readline().strip()
+ if user_input == "YES":
+ return True
+ return False
+class Plugin(object):
+ """A plugin base class."""
+ author = None
+ version = None
+ enabled = False
+ def __getattr__(self, name):
+ """
+ Get missing attribute from a container.
+ This is quite hackish but it allows to define settings and methods per container.
+ """
+ return getattr(self.container, name)
+class Command(Plugin):
+ """An abstract class representing a command for CommandOptionParser."""
+ enabled = False
+ admin = False
+ username_prompt = staticmethod(username_prompt)
+ password_prompt = staticmethod(password_prompt)
+ yes_no_prompt = staticmethod(yes_no_prompt)
+ are_you_sure_prompt = staticmethod(are_you_sure_prompt)
+ def __init__(self, parser):
+ Plugin.__init__(self)
+ self.parser = parser
+ def options(self):
+ """Add options to self.parser."""
+ pass
+ def run(self, *args, **kwargs):
+ """Run a command. Arguments contain parsed options."""
+ raise NotImplementedError()
+ def set_repo(self, **kwargs):
+ self.container.set_repo(**kwargs)
+ def set_taskrepo(self, **kwargs):
+ self.container.set_taskrepo(**kwargs)
+class PluginContainer(object):
+ """A plugin container.
+ Usage: Inherit PluginContainer and register plugins to the new class.
+ """
+ def __getitem__(self, name):
+ return self._get_plugin(name)
+ def __iter__(self):
+ return self.plugins.iterkeys()
+ @classmethod
+ def normalize_name(cls, name):
+ return name
+ @classmethod
+ def _get_plugins(cls):
+ """Return dictionary of registered plugins."""
+ result = {}
+ parent_plugins = cls._get_parent_plugins(cls.normalize_name).items()
+ class_plugins = getattr(cls, "_class_plugins", {}).items()
+ for name, plugin_class in parent_plugins + class_plugins:
+ result[name] = type(plugin_class.__name__, (plugin_class, ), {"__doc__": plugin_class.__doc__})
+ return result
+ @classmethod
+ def _get_parent_plugins(cls, normalize_function):
+ result = {}
+ for parent in cls.__bases__:
+ if parent is PluginContainer:
+ # don't use PluginContainer itself - plugins have to be registered to subclasses
+ continue
+ if not issubclass(parent, PluginContainer):
+ # skip parents which are not PluginContainer subclasses
+ continue
+ # read inherited plugins first (conflicts are resolved recursively)
+ plugins = parent._get_parent_plugins(normalize_function)
+ # read class plugins, override inherited on name conflicts
+ if hasattr(parent, "_class_plugins"):
+ for plugin_class in parent._class_plugins.values():
+ normalized_name = normalize_function(plugin_class.__name__)
+ plugins[normalized_name] = plugin_class
+ for name, value in plugins.iteritems():
+ if result.get(name, value) != value:
+ raise RuntimeError("Cannot register plugin '%s'. Another plugin with the same normalized name (%s) is already in the container." % (str(value), normalized_name))
+ result.update(plugins)
+ return result
+ @property
+ def plugins(self):
+ if not hasattr(self, "_plugins"):
+ self._plugins = self.__class__._get_plugins()
+ return self._plugins
+ def _get_plugin(self, name):
+ """Return a plugin or raise KeyError."""
+ normalized_name = self.normalize_name(name)
+ if normalized_name not in self.plugins:
+ raise KeyError("Plugin not found: %s" % normalized_name)
+ plugin = self.plugins[normalized_name]
+ plugin.container = self
+ plugin.normalized_name = normalized_name
+ return plugin
+ @classmethod
+ def register_plugin(cls, plugin):
+ """Register a new plugin. Return normalized plugin name."""
+ if cls is PluginContainer:
+ raise TypeError("Can't register plugin to the PluginContainer base class.")
+ if "_class_plugins" not in cls.__dict__:
+ cls._class_plugins = {}
+ if not getattr(plugin, "enabled", False):
+ return
+ normalized_name = cls.normalize_name(plugin.__name__)
+ cls._class_plugins[normalized_name] = plugin
+ return normalized_name
+ @classmethod
+ def register_module(cls, module, prefix=None, skip_broken=False):
+ """Register all plugins in a module's sub-modules.
+ @param module: a python module that contains plugin sub-modules
+ @type module: module
+ @param prefix: if specified, only modules with this prefix will be processed
+ @type prefix: str
+ @param skip_broken: skip broken sub-modules and print a warning
+ @type skip_broken: bool
+ """
+ path = os.path.dirname(module.__file__)
+ module_list = []
+ for fn in os.listdir(path):
+ if not fn.endswith(".py"):
+ continue
+ if fn.startswith("_"):
+ continue
+ if prefix and not fn.startswith(prefix):
+ continue
+ if not os.path.isfile(os.path.join(path, fn)):
+ continue
+ module_list.append(fn[:-3])
+ if skip_broken:
+ for mod in module_list[:]:
+ try:
+ __import__(module.__name__, {}, {}, [mod])
+ except:
+ import sys
+ print >> sys.stderr, "WARNING: Skipping broken plugin module: %s.%s" % (module.__name__, mod)
+ module_list.remove(mod)
+ else:
+ __import__(module.__name__, {}, {}, module_list)
+ for mn in module_list:
+ mod = getattr(module, mn)
+ for pn in dir(mod):
+ plugin = getattr(mod, pn)
+ if type(plugin) is type and issubclass(plugin, Plugin) and plugin is not Plugin:
+ cls.register_plugin(plugin)
+class CommandContainer(PluginContainer):
+ """Container for Command classes."""
+ @classmethod
+ def normalize_name(cls, name):
+ """Replace some characters in command names."""
+ return name.lower().replace('_', '-').replace(' ', '-')
+ def set_repo(self, **kwargs):
+ self.repo = Repo(os.getcwd(), search_parent_directories=True)
+ def set_taskrepo(self, init=False, **kwargs):
+ # do some work to find the default location for the db
+ db_location = os.path.join(self.repo.working_tree_dir, "taskrepo.db")
+ if not init and not os.path.exists(db_location):
+ raise ValueError(u'Not a valid taskrepo, please init first! (git taskrepo init)')
+ conn = lite.connect(db_location)
+ self.taskrepo = conn
+class CommandOptionParser(optparse.OptionParser):
+ """Enhanced OptionParser with plugin support."""
+ def __init__(self,
+ usage=None,
+ option_list=None,
+ option_class=Option,
+ version=None,
+ conflict_handler="error",
+ description=None,
+ formatter=None,
+ add_help_option=True,
+ prog=None,
+ command_container=None,
+ default_command="help",
+ add_username_password_options=False):
+ usage = usage or "%prog <command> [args] [--help]"
+ self.container = command_container
+ self.default_command = default_command
+ self.command = None
+ formatter = formatter or optparse.IndentedHelpFormatter(max_help_position=33)
+ optparse.OptionParser.__init__(self, usage, option_list, option_class, version, conflict_handler, description, formatter, add_help_option, prog)
+ if add_username_password_options:
+ option_list = [
+ optparse.Option("--username", help="specify user"),
+ optparse.Option("--password", help="specify password"),
+ ]
+ self._populate_option_list(option_list, add_help=False)
+ def print_help(self, file=None, admin=False):
+ if file is None:
+ file = sys.stdout
+ file.write(self.format_help())
+ if self.command in (None, "help", "help-admin"):
+ file.write("\n")
+ file.write(self.format_help_commands(admin=admin))
+ def format_help_commands(self, admin=False):
+ commands = []
+ admin_commands = []
+ for name, plugin in sorted(self.container.plugins.iteritems()):
+ is_admin = getattr(plugin, "admin", False)
+ text = " %-30s %s" % (name, plugin.__doc__ or "")
+ if is_admin:
+ if admin:
+ admin_commands.append(text)
+ else:
+ commands.append(text)
+ if commands:
+ commands.insert(0, "commands:")
+ commands.append("")
+ if admin_commands:
+ admin_commands.insert(0, "admin commands:")
+ admin_commands.append("")
+ return "\n".join(commands + admin_commands)
+ def parse_args(self, args=None, values=None):
+ """return (command_instance, opts, args)"""
+ args = self._get_args(args)
+ command = None
+ if len(args) > 0 and not args[0].startswith("-"):
+ command = args[0]
+ args = args[1:]
+ else:
+ command = self.default_command
+ # keep args as is
+ if not command in self.container.plugins:
+ self.error("unknown command: %s" % command)
+ CommandClass = self.container[command]
+ cmd = CommandClass(self)
+ if self.command != cmd.normalized_name:
+ self.command = cmd.normalized_name
+ cmd.options()
+ cmd_opts, cmd_args = optparse.OptionParser.parse_args(self, args, values)
+ return (cmd, cmd_opts, cmd_args)
+ def run(self, args=None, values=None):
+ """parse arguments and run a command"""
+ cmd, cmd_opts, cmd_args = self.parse_args(args, values)
+ cmd_kwargs = cmd_opts.__dict__
+*cmd_args, **cmd_kwargs)
+class Help(Command):
+ """show this help message and exit"""
+ enabled = True
+ def options(self):
+ pass
+ def run(self, *args, **kwargs):
+ self.parser.print_help(admin=False)
+class Help_Admin(Command):
+ """show help message about administrative commands and exit"""
+ enabled = True
+ def options(self):
+ # override default --help option
+ opt = self.parser.get_option("--help")
+ opt.action = "store_true"
+ opt.dest = "help"
+ def run(self, *args, **kwargs):
+ self.parser.print_help(admin=True)
diff --git a/git_taskrepo/ b/git_taskrepo/
new file mode 100755
index 0000000..b93797d
--- /dev/null
+++ b/git_taskrepo/
@@ -0,0 +1,60 @@
+# -*- coding: utf-8 -*-
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+import os
+import sys
+from optparse import Option, IndentedHelpFormatter, SUPPRESS_HELP
+from command import CommandOptionParser, CommandContainer
+import git.exc
+__version__ = '0.1'
+__all__ = (
+ "main",
+class BeakerOptionParser(CommandOptionParser):
+ standard_option_list = [
+ ]
+# register default command plugins
+import git_taskrepo.sub_commands
+CommandContainer.register_module(git_taskrepo.sub_commands, prefix="cmd_")
+def main():
+ command_container = CommandContainer()
+ formatter = IndentedHelpFormatter(max_help_position=60, width=120)
+ parser = BeakerOptionParser(version=__version__,
+ conflict_handler='resolve',
+ command_container=command_container,
+ default_command="help", formatter=formatter)
+ # Need to deal with the possibility that requests is not importable...
+ try:
+ import requests
+ maybe_http_error = (requests.HTTPError,)
+ except ImportError:
+ maybe_http_error = ()
+ # This is, but with more sensible error handling
+ cmd, cmd_opts, cmd_args = parser.parse_args()
+ try:
+ return*cmd_args, **cmd_opts.__dict__)
+ except git.exc.InvalidGitRepositoryError, e:
+ sys.stderr.write('Not a valid git repo: %s\n' % e)
+ return 1
+ except git.exc, e:
+ sys.stderr.write('GIT error: %s\n' % e)
+ return 1
+ except ValueError, e:
+ sys.stderr.write('%s\n' % e)
+if __name__ == '__main__':
+ sys.exit(main())
diff --git a/git_taskrepo/sub_commands/ b/git_taskrepo/sub_commands/
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/git_taskrepo/sub_commands/
diff --git a/git_taskrepo/sub_commands/ b/git_taskrepo/sub_commands/
new file mode 100644
index 0000000..592f7d3
--- /dev/null
+++ b/git_taskrepo/sub_commands/
@@ -0,0 +1,149 @@
+# -*- coding: utf-8 -*-
+from git_taskrepo.command import Command
+from git_taskrepo.testinfo import ParserError
+from git_taskrepo.taskrepo import update_taskrepo, parse_testinfo, TRX_Parse, TRX_TestInfo
+import sys, os, commands
+import sqlite3
+def update_file(filename, line_to_add):
+ seen=False
+ updated=False
+ if os.path.exists(filename):
+ with open(filename) as f:
+ for line in f:
+ line = line.rstrip()
+ if line == line_to_add:
+ seen=True
+ break
+ if seen == False:
+ with open(filename,'a') as f:
+ f.write("%s\n" % line_to_add)
+ updated=True
+ return updated
+class Init(Command):
+ """Init Task Repo"""
+ enabled = True
+ def options(self):
+ self.parser.usage = "%%prog %s" % self.normalized_name
+ self.parser.add_option(
+ "--origin",
+ default=None,
+ help="Specify a read only origin. This is needed if your current origin is ssh://"
+ )
+ self.parser.add_option(
+ "--no-import",
+ default=False,
+ action="store_true",
+ help="Do not automatically import all tasks."
+ )
+ def run(self, *args, **kwargs):
+ # get our repo handler
+ self.set_repo(**kwargs)
+ # make sure origin is usable without authentication
+ if not kwargs.get("origin") and not getattr(self.repo.remotes, 'origin', None):
+ self.parser.error("Your git repo doesn't have a origin specified. use --origin")
+ remote = kwargs.get("origin") or self.repo.remotes.origin.url
+ if remote.startswith("ssh://"):
+ self.parser.error("remote origin is %s, you must specify an origin that doesn't need authentication. use --origin" % remote)
+ print("Initializing taskrepo:")
+ # get our taskrepo handler
+ self.set_taskrepo(init=True)
+ # Initialize the DB with our tables
+ with self.taskrepo:
+ cur = self.taskrepo.cursor()
+ # Create tables if needed
+ cur.execute("CREATE TABLE IF NOT EXISTS config(origin TEXT NOT NULL)")
+ cur.execute("CREATE TABLE IF NOT EXISTS key_value_inc(task_id INTEGER, key TEXT NOT NULL, value TEXT NOT NULL)")
+ cur.execute("CREATE TABLE IF NOT EXISTS key_value_exc(task_id INTEGER, key TEXT NOT NULL, value TEXT NOT NULL)")
+ cur.execute("DELETE FROM config")
+ cur.execute("INSERT INTO config(origin) VALUES (?)", (remote,))
+ index = self.repo.index
+ # Add taskrepo.db to .gitignore
+ gitignore = "%s/.gitignore" % self.repo.working_tree_dir
+ if update_file(gitignore, "taskrepo.db"):
+ print(" - Added taskrepo.db to .gitignore")
+ index.add([gitignore])
+ # Add testinfo.desc to .gitignore
+ gitignore = "%s/.gitignore" % self.repo.working_tree_dir
+ if update_file(gitignore, "testinfo.desc"):
+ print(" - Added testinfo.desc to .gitignore")
+ index.add([gitignore])
+ # if we updated the repo then commit it
+ if self.repo.is_dirty():
+ assert index.commit("Initialized to use git taskrepo.").type == 'commit'
+ print(" - committed to git")
+ # Add git hooks to automatically update taskrepo.db
+ post_commit_hook = """\
+echo "post-commit"
+git diff --name-only HEAD@{1} HEAD | \
+while read file; do
+ echo "$file" | grep 'Makefile$' -q
+ if [ $? -eq 0 ]; then
+ dirname=$(dirname $file)
+ git taskrepo update $dirname
+ fi
+ if not os.path.exists("%s/hooks/post-commit" % self.repo.git_dir):
+ with open("%s/hooks/post-commit" % self.repo.git_dir,'w') as f:
+ f.write(post_commit_hook)
+ os.chmod("%s/hooks/post-commit" % self.repo.git_dir, 0755)
+ print(" - Installed post-commit hook")
+ post_checkout_hook = """\
+echo "post-checkout"
+git diff --name-only $1 $2 | \
+while read file; do
+ echo "$file" | grep 'Makefile$' -q
+ if [ $? -eq 0 ]; then
+ dirname=$(dirname $file)
+ git taskrepo update $dirname
+ fi
+ if not os.path.exists("%s/hooks/post-checkout" % self.repo.git_dir):
+ with open("%s/hooks/post-checkout" % self.repo.git_dir,'w') as f:
+ f.write(post_checkout_hook)
+ os.chmod("%s/hooks/post-checkout" % self.repo.git_dir, 0755)
+ print(" - Installed post-checkout hook")
+ # walk the git repo from working_tree_dir and import all tasks
+ # unless option --no-import was passed in.
+ if kwargs.get("no_import") is False:
+ print(" - Importing tasks into taskrepo")
+ for (dirpath, dirnames, filenames) in os.walk(self.repo.working_tree_dir):
+ try:
+ update_taskrepo(self.repo, self.taskrepo, dirpath)
+ except ParserError, e:
+ print >> sys.stderr, (" - %s FAIL (%s)." % (dirpath, e))
+ except TRX_TestInfo:
+ pass
+ else:
+ print(" - %s Imported." % dirpath)
+ print("Done!")
diff --git a/git_taskrepo/sub_commands/ b/git_taskrepo/sub_commands/
new file mode 100644
index 0000000..3381da5
--- /dev/null
+++ b/git_taskrepo/sub_commands/
@@ -0,0 +1,45 @@
+# -*- coding: utf-8 -*-
+from git_taskrepo.command import Command
+class List(Command):
+ """List Tasks"""
+ enabled = True
+ def options(self):
+ self.parser.usage = "%%prog %s" % self.normalized_name
+ self.parser.add_option(
+ "--type",
+ metavar="TYPE",
+ action="append",
+ help="List tasks only of TYPE")
+ def run(self, *args, **kwargs):
+ self.set_repo(**kwargs)
+ self.set_taskrepo(**kwargs)
+ conn = self.taskrepo
+ with conn:
+ cur = conn.cursor()
+ cur.execute("SELECT origin FROM config")
+ result = cur.fetchone()
+ origin = result[0]
+ values = []
+ joins = []
+ where = []
+ extra = ""
+ if kwargs.get("type"):
+ for x in range(0, len(kwargs.get("type"))):
+ joins.append("LEFT JOIN key_value_inc AS kvi_%d ON kvi_%d.task_id =" % (x, x))
+ where.append("kvi_%d.key='types' AND kvi_%d.value=?" % (x, x))
+ values.append(kwargs.get("type")[x])
+ if where:
+ extra = ' '.join(joins)
+ extra = "%s WHERE %s" % (extra, ' AND '.join(where))
+ cur.execute("SELECT name FROM tasks %s" % extra, values)
+ rows = cur.fetchall()
+ for row in rows:
+ print "%s" % row[0]
diff --git a/git_taskrepo/sub_commands/ b/git_taskrepo/sub_commands/
new file mode 100644
index 0000000..91f30db
--- /dev/null
+++ b/git_taskrepo/sub_commands/
@@ -0,0 +1,29 @@
+# -*- coding: utf-8 -*-
+import sys, os
+from git_taskrepo.command import Command
+from git_taskrepo.taskrepo import update_taskrepo, parse_testinfo, TRX
+class Update(Command):
+ """Update Task Repo"""
+ enabled = True
+ def options(self):
+ self.parser.usage = "%%prog %s [<path/to/task>]" % self.normalized_name
+ def run(self, *args, **kwargs):
+ self.set_repo(**kwargs)
+ self.set_taskrepo(**kwargs)
+ if len(args) >= 1:
+ taskpath = os.path.normpath(os.path.join(os.getcwd(), args[0]))
+ else:
+ taskpath = os.getcwd()
+ sys.stderr.write("[TaskRepo] Updating %s ... " % taskpath)
+ try:
+ update_taskrepo(self.repo, self.taskrepo, taskpath)
+ except TRX, e:
+ sys.stderr.write("FAIL (%s).\n" % e)
+ sys.exit(1)
+ else:
+ sys.stderr.write("done.\n")
diff --git a/git_taskrepo/ b/git_taskrepo/
new file mode 100644
index 0000000..9028101
--- /dev/null
+++ b/git_taskrepo/
@@ -0,0 +1,81 @@
+from git_taskrepo.testinfo import SemiStrictParser, ParserError
+import string
+import os
+import commands
+# Keys from testinfo to populate with
+keys = ("test_archs",
+ "releases",
+ "runfor",
+ "types",
+ "bugs",
+ )
+singles = ("test_name",
+ "test_description",
+ "owner",
+ )
+class TaskRepoException(Exception):
+ pass
+class TRX(TaskRepoException):
+ pass
+class TRX_Parse(TRX):
+ pass
+class TRX_TestInfo(TRX):
+ pass
+def only_ascii(s):
+ return filter(lambda x: x in string.printable, s)
+def _update_taskrepo(taskrepo, taskname, testinfo):
+ with taskrepo:
+ cur = taskrepo.cursor()
+ # Do we already have an entry for this task?
+ cur.execute("SELECT id FROM tasks WHERE name=?", (taskname,))
+ result = cur.fetchone()
+ if result:
+ taskid = result[0]
+ else:
+ cur.execute("INSERT INTO tasks(name) VALUES (?)", (taskname,))
+ taskid = cur.lastrowid;
+ # Clear old values
+ cur.execute("DELETE FROM key_value_inc WHERE task_id=?", (taskid,))
+ cur.execute("DELETE FROM key_value_exc WHERE task_id=?", (taskid,))
+ # Populate with new values
+ for key in keys:
+ for value in getattr(testinfo, key):
+ if value and type(value) == type(str()) and value[0] == '-':
+ table = "key_value_exc"
+ value = value[1:]
+ else:
+ table = "key_value_inc"
+ cur.execute("INSERT INTO %s VALUES (?,?,?)" % table, (taskid, key, value))
+ for key in singles:
+ value = only_ascii(getattr(testinfo, key))
+ cur.execute("INSERT INTO key_value_inc VALUES (?,?,?)", (taskid, key, value))
+def update_taskrepo(repo, taskrepo, taskpath):
+ if os.path.exists(os.path.join(taskpath, "Makefile")):
+ (status, output) = commands.getstatusoutput("make -C %s -q testinfo.desc" % taskpath)
+ if status == 1:
+ os.system("make -C %s testinfo.desc" % taskpath)
+ if os.path.exists(os.path.join(taskpath, "testinfo.desc")):
+ taskname = taskpath.split(repo.working_tree_dir)[1]
+ try:
+ testinfo = parse_testinfo(os.path.join(taskpath, "testinfo.desc"))
+ except ParserError:
+ raise
+ _update_taskrepo(taskrepo, taskname, testinfo)
+ else:
+ raise TRX_TestInfo('No testinfo.desc')
+def parse_testinfo(filename):
+ p = SemiStrictParser(True)
+ p.parse(open(filename).readlines())
+ return
diff --git a/git_taskrepo/ b/git_taskrepo/
new file mode 100644
index 0000000..e7120c7
--- /dev/null
+++ b/git_taskrepo/
@@ -0,0 +1,1070 @@
+# Copyright (c) 2006 Red Hat, Inc.
+# This program is free software: you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation, either version 2 of
+# the License, or (at your option) any later version.
+# This program is distributed in the hope that it will be
+# useful, but WITHOUT ANY WARRANTY; without even the implied
+# PURPOSE. See the GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see
+# Author: David Malcolm
+# IMPORTANT. This file ( still remains in rhts as well.
+# When making any changes to this file, please assess what changes (if any) are needed
+# to be made in the corresponding file in rhts.
+import re
+import unittest
+import tempfile
+import sys
+namespaces = [ ('desktop', ['evolution', '', 'poppler', 'shared-mime-info']),
+ ('tools', ['gcc']),
+ ('CoreOS', ['rpm']),
+ ('cluster', []),
+ ('rhn', []) ]
+def get_namespace_for_package(packageName):
+ for (namespace, packages) in namespaces:
+ if packageName in packages:
+ return namespace
+ # not found:
+ return None
+class TestInfo:
+ """Class representing metadata about a test, suitable for outputting as a
+ testinfo.desc file"""
+ def __init__(self):
+ self.test_name = None
+ self.test_description = None
+ self.test_archs = []
+ self.owner = None
+ self.testversion = None
+ self.releases = []
+ self.priority = None
+ self.destructive = None
+ self.license = None
+ self.confidential = None
+ self.avg_test_time = None
+ self.test_path = None
+ self.requires = []
+ self.rhtsrequires = []
+ self.runfor = []
+ self.bugs = []
+ self.types = []
+ self.needs = []
+ self.need_properties = []
+ self.siteconfig = []
+ self.kickstart = None
+ self.options = []
+ self.environment = {}
+ self.provides = []
+ def output_string_field(self, file, fileFieldName, dictFieldName):
+ value = self.__dict__[dictFieldName]
+ if value:
+ file.write('%s: %s\n'%(fileFieldName, value))
+ def output_string_list_field(self, file, fileFieldName, dictFieldName):
+ value = self.__dict__[dictFieldName]
+ if value:
+ file.write('%s: %s\n'%(fileFieldName, ' '.join(value)))
+ def output_string_dict_field(self, file, fileFieldName, dictFieldName):
+ value = self.__dict__[dictFieldName]
+ if value:
+ for key, val in value.items():
+ if val:
+ file.write('%s: %s=%s\n'%(fileFieldName, key, val))
+ def output_bool_field(self, file, fileFieldName, dictFieldName):
+ value = self.__dict__[dictFieldName]
+ if value is not None:
+ if value:
+ strValue = "yes"
+ else:
+ strValue = "no"
+ file.write('%s: %s\n'%(fileFieldName, strValue))
+ def output(self, file):
+ """
+ Write out a testinfo.desc to the file object
+ """
+ self.output_string_field(file, 'Name', 'test_name')
+ self.output_string_field(file, 'Description', 'test_description')
+ self.output_string_list_field(file, 'Architectures', 'test_archs')
+ self.output_string_field(file, 'Owner', 'owner')
+ self.output_string_field(file, 'TestVersion', 'testversion')
+ self.output_string_list_field(file, 'Releases', 'releases')
+ self.output_string_field(file, 'Priority', 'priority')
+ self.output_bool_field(file, 'Destructive', 'destructive')
+ self.output_string_field(file, 'License', 'license')
+ self.output_bool_field(file, 'Confidential', 'confidential')
+ self.output_string_field(file, 'TestTime', 'avg_test_time')
+ self.output_string_field(file, 'Path', 'test_path')
+ self.output_string_list_field(file, 'Requires', 'requires')
+ self.output_string_list_field(file, 'RhtsRequires', 'rhtsrequires')
+ self.output_string_list_field(file, 'RunFor', 'runfor')
+ self.output_string_list_field(file, 'Bugs', 'bugs')
+ self.output_string_list_field(file, 'Type', 'types')
+ self.output_string_list_field(file, 'RhtsOptions', 'options')
+ self.output_string_dict_field(file, 'Environment', 'environment')
+ self.output_string_list_field(file, 'Provides', 'provides')
+ for (name, op, value) in self.need_properties:
+ file.write('NeedProperty: %s %s %s\n'%(name, op, value))
+ file.write(self.generate_siteconfig_lines())
+ def generate_siteconfig_lines(self):
+ result = ""
+ for (arg, description) in self.siteconfig:
+ if self.test_name:
+ if arg.startswith(self.test_name):
+ # Strip off common prefix:
+ arg = arg[len(self.test_name)+1:]
+ result += 'SiteConfig(%s): %s\n'%(arg, description)
+ return result
+class Validator:
+ """
+ Abstract base class for validating fields
+ """
+ pass
+class RegexValidator(Validator):
+ def __init__(self, pattern, message):
+ self.pattern = pattern
+ self.msg = message
+ def is_valid(self, value):
+ return re.match(self.pattern, value)
+ def message(self):
+ return self.msg
+# This is specified in RFC2822 Section 3.4,
+# we accept only the most common variations
+class NameAddrValidator(RegexValidator):
+ ATOM_CHARS = r"\w!#$%&'*+-/=?^_`{|}~"
+ PHRASE = r' *[%s][%s ]*' % (ATOM_CHARS, ATOM_CHARS)
+ ADDR_SPEC = r'[%s.]+@[%s.]+' % (ATOM_CHARS, ATOM_CHARS)
+ NAME_ADDR = r'%s<%s> *' % (PHRASE, ADDR_SPEC)
+ def __init__(self):
+ RegexValidator.__init__(self, self.NAME_ADDR,
+ 'should be a valid RFC2822 name_addr, '
+ 'such as John Doe <>')
+class ListValidator(Validator):
+ def __init__(self, validValues):
+ self.validValues = validValues
+ def is_valid(self, value):
+ return value in self.validValues
+ def message(self):
+ errorMsg = 'valid values are'
+ for value in self.validValues:
+ errorMsg += ' "%s"'%value
+ return errorMsg
+class DashListValidator(ListValidator):
+ def is_valid(self, value):
+ if value.startswith('-'):
+ value = value[1:]
+ return ListValidator.is_valid(self, value)
+ def message(self):
+ return ListValidator.message(self) + " optionally prefixed with '-'"
+class BoolValidator(Validator):
+ def __init__(self):
+ pass
+ def convert(self, value):
+ if re.match("y|yes|1", value):
+ return True
+ if re.match("n|no|0", value):
+ return False
+ return None
+ def is_valid(self, value):
+ return self.convert(value) is not None
+ def message(self):
+ return "boolean value expected"
+class Parser:
+ """
+ Parser for testinfo.desc files
+ """
+ def __init__(self):
+ = TestInfo()
+ # All of these could be populated based on a DB query if we wanted to structure things that way:
+ self.valid_root_ns = [
+ 'distribution',
+ 'installation',
+ 'kernel',
+ 'desktop',
+ 'tools',
+ 'CoreOS',
+ 'cluster',
+ 'rhn',
+ 'examples',
+ 'performance',
+ 'ISV',
+ 'virt'
+ ]
+ self.root_ns_with_mnt_tests_subtree = ['distribution', 'kernel']
+ self.valid_architectures = [
+ 'ia64',
+ 'x86_64',
+ 'ppc',
+ 'ppc64',
+ 'ppc64le',
+ 's390',
+ 's390x',
+ 'i386',
+ 'aarch64',
+ 'arm',
+ 'armhfp',
+ ]
+ self.valid_priorities = [
+ 'Low',
+ 'Medium',
+ 'Normal',
+ 'High',
+ 'Manual'
+ ]
+ self.valid_options = [
+ 'Compatible',
+ 'CompatService',
+ 'StrongerAVC',
+ ]
+ def handle_error(self, message):
+ raise NotImplementedError
+ def handle_warning(self, message):
+ raise NotImplementedError
+ def error_if_not_in_array(self, fieldName, value, validValues):
+ if not value in validValues:
+ errorMsg = '"%s" is not a valid value for %s; valid values are'%(value, fieldName);
+ for validValue in validValues:
+ errorMsg += ' "%s"'%validValue
+ self.handle_error(errorMsg)
+ def __mandatory_field(self, fileFieldName, dictFieldName):
+ if not[dictFieldName]:
+ self.handle_error("%s field not defined"%fileFieldName)
+ def __unique_field(self, fileFieldName, dictFieldName, value, validator=None):
+ if[dictFieldName]:
+ self.handle_error("%s field already defined"%fileFieldName)
+[dictFieldName] = value
+ if validator:
+ if not validator.is_valid(value):
+ self.handle_error('"%s" is not a valid %s field (%s)'%(value, fileFieldName, validator.message()))
+ def __bool_field(self, fileFieldName, dictFieldName, raw_value):
+ validator = BoolValidator()
+ if not validator.is_valid(raw_value):
+ self.handle_error('"%s" is not a valid %s field (%s)'
+ % (raw_value, fileFieldName, validator.message()))
+ value = validator.convert(raw_value)
+ self.__unique_field(fileFieldName, dictFieldName, value)
+ def _handle_dict(self, fileFieldName, dictFieldName, value, validator=None, key_validator=None):
+ kv = value.split("=", 1)
+ if len(kv) < 2:
+ self.handle_error("Malformed %s field not matching KEY=VALUE pattern" % fileFieldName)
+ return
+ k, v = kv
+ d = getattr(, dictFieldName)
+ if d.has_key(k):
+ self.handle_error("%s: Duplicate entry for %r" % (fileFieldName, k))
+ return
+ if key_validator and not key_validator.is_valid(k):
+ self.handle_error('"%s" is not a valid key for %s field (%s)'%(k, fileFieldName, key_validator.message()))
+ return
+ if validator and not validator.is_valid(v):
+ self.handle_error('"%s" is not a valid %s field (%s)'%(v, fileFieldName, validator.message()))
+ return
+ d[k] = kv[1]
+ def _handle_unique_list(self, fileFieldName, dictFieldName, value, validator=None, split_at=" "):
+ l = getattr(, dictFieldName)
+ if l:
+ self.handle_error("%s field already defined"%fileFieldName)
+ return
+ items = value.split(split_at)
+ if validator:
+ for item in items:
+ if not validator.is_valid(item):
+ self.handle_error('"%s" is not a valid %s field (%s)'%(item, fileFieldName, validator.message()))
+ continue
+ l.append(item)
+ else:
+ l.extend(items)
+ def handle_name(self, key, value):
+ self.__unique_field(key, 'test_name', value)
+ if not re.match('^/', value):
+ self.handle_error("Name field does not begin with a forward-slash")
+ return
+ name_frags= value.split('/')
+ #print name_frags
+ root_ns = name_frags[1]
+ = root_ns
+ = "/".join(name_frags[2:])
+ =
+ # print "name_under_root_ns: %s"
+ = name_frags
+ def handle_desc(self, key, value):
+ self.__unique_field(key, 'test_description', value)
+ def handle_owner(self, key, value):
+ # Required one-only email addresses "John Doe <>"
+ # In theory this could be e.g.; too expensive to check for that here
+ self.__unique_field(key, 'owner', value, NameAddrValidator())
+ def handle_testversion(self, key, value):
+ self.__unique_field(key, 'testversion', value, RegexValidator(r'^([A-Za-z0-9\.]*)$', 'can only contain numbers, letters and the dot symbol'))
+ # FIXME: we can probably support underscores as well
+ def handle_license(self, key, value):
+ self.__unique_field(key, 'license', value)
+ def handle_deprecated(self, key, value):
+ self.handle_warning("%s field is deprecated"%key)
+ def handle_releases(self, key, value):
+ self.__unique_field(key, 'releases', value)
+ num_negative_releases = 0
+ num_positive_releases = 0
+ releases = []
+ for release in value.split(" "):
+ #print "Got release: release"
+ releases.append(release)
+ m = re.match('^-(.*)', release)
+ if m:
+ cleaned_release =
+ # print "Got negative release: %s"%cleaned_release
+ num_negative_releases+=1
+ else:
+ cleaned_release = release
+ # print "Got positive release: %s"%release
+ num_positive_releases+=1
+ if num_negative_releases>0 and num_positive_releases>0:
+ self.handle_warning("Releases field lists both negated and non-negated release names (should be all negated, or all non-negated)")
+ = releases
+ def handle_archs(self, key, value):
+ self.__unique_field(key, 'test_archs', value)
+ archs = []
+ for arch in value.split(" "):
+ self.error_if_not_in_array("Architecture", arch, self.valid_architectures)
+ archs.append(arch)
+ = archs
+ def handle_options(self, key, value):
+ self._handle_unique_list(key, 'options', value, DashListValidator(self.valid_options))
+ def handle_environment(self, key, value):
+ self._handle_dict(key, 'environment', value, key_validator=RegexValidator(r'^([A-Za-z_][A-Za-z0-9_]*)$', 'Can contain only letters, numbers and underscore.'))
+ def handle_priority(self, key, value):
+ self.__unique_field(key, 'priority', value, ListValidator(self.valid_priorities))
+ def handle_destructive(self, key, value):
+ self.__bool_field(key, 'destructive', value)
+ def handle_confidential(self, key, value):
+ self.__bool_field(key, 'confidential', value)
+ def handle_testtime(self, key, value):
+ if
+ self.handle_error("%s field already defined"%key)
+ return
+ # TestTime is an integer with an optional minute (m) or hour (h) suffix
+ m = re.match('^(\d+)(.*)$', value)
+ if m:
+ = int(
+ suffix =
+ if suffix == '':
+ pass # no units means seconds
+ elif suffix == 'm':
+ *= 60
+ elif suffix == 'h':
+ *= 3600
+ else:
+ self.handle_warning("TestTime unit is not valid, should be m (minutes) or h (hours)")
+ return
+ if<60:
+ self.handle_warning("TestTime should not be less than a minute")
+ else:
+ self.handle_error("Malformed %s field"%key)
+ def handle_type(self, key, value):
+ for type in value.split(" "):
+ def handle_kickstart(self, key, value):
+ = value
+ def handle_bug(self, key, value):
+ for bug in value.split(" "):
+ # print "Got bug: %s"%bug
+ m = re.match('^([1-9][0-9]*)$', bug)
+ if m:
+ else:
+ self.handle_error('"%s" is not a valid Bug value (should be numeric)'%bug)
+ def handle_path(self, key, value):
+ if
+ self.handle_error("Path field already defined")
+ if re.match(r'^\/mnt\/tests\/', value):
+ absolute_path = value
+ else:
+ if re.match(r'^\/', value):
+ self.handle_error("Path field is absolute but is not below /mnt/tests")
+ # Relative path:
+ absolute_path = "/mnt/tests/"+value
+ = absolute_path
+ def handle_runfor(self, key, value):
+ for pkgname in value.split(" "):
+ def handle_requires(self, key, value):
+ for pkgname in value.split(" "):
+ def handle_rhtsrequires(self, key, value):
+ for pkgname in value.split(" "):
+ def handle_provides(self, key, value):
+ for pkgname in value.split(" "):
+ def handle_needproperty(self, key, value):
+ m = re.match(r'^([A-Za-z0-9]*)\s+(=|>|>=|<|<=)\s+([A-Z:a-z0-9]*)$', value)
+ if m:
+ else:
+ self.handle_error('"%s" is not a valid %s field; %s'%(value, key, "must be of the form PROPERTYNAME {=|>|>=|<|<=} PROPERTYVALUE"))
+ def handle_deprecated_for_needproperty(self, key, value):
+ self.handle_error("%s field is deprecated. Use NeedProperty instead"%key)
+ def __handle_siteconfig(self, arg, value):
+ if re.match('^/.*', arg):
+ # Absolute path:
+ absPath = arg
+ else:
+ # Relative path:
+ if
+ absPath = + '/' + arg
+ else:
+ self.handle_error("A relative SiteConfig(): declaration appeared before a Name: field")
+ return
+ (absPath, value) )
+ def __handle_declaration(self, decl, arg, value):
+ # print 'decl: "%s"'%decl
+ # print 'arg: "%s"'%arg
+ # print 'value: "%s"'%value
+ if decl=="SiteConfig":
+ self.__handle_siteconfig(arg, value)
+ else:
+ self.handle_error('"%s" is not a valid declaration"')
+ def parse(self, lines):
+ # Map from field names to value-parsing methods:
+ fields = {'Name' : self.handle_name,
+ 'Description' : self.handle_desc,
+ 'Notify' : self.handle_deprecated,
+ 'Owner' : self.handle_owner,
+ 'License' : self.handle_license,
+ 'Releases': self.handle_releases,
+ 'Architectures': self.handle_archs,
+ 'RhtsOptions': self.handle_options,
+ 'Environment': self.handle_environment,
+ 'Priority': self.handle_priority,
+ 'Destructive': self.handle_destructive,
+ 'Confidential': self.handle_confidential,
+ 'Type': self.handle_type,
+ 'Bug': self.handle_bug,
+ 'Bugs': self.handle_bug,
+ 'RunFor': self.handle_runfor,
+ 'Requires': self.handle_requires,
+ 'RhtsRequires': self.handle_rhtsrequires,
+ 'NeedProperty': self.handle_needproperty,
+ 'Need': self.handle_deprecated_for_needproperty,
+ 'Want': self.handle_deprecated_for_needproperty,
+ 'WantProperty': self.handle_deprecated_for_needproperty,
+ 'Kickstart': self.handle_kickstart,
+ 'Provides': self.handle_provides,
+ }
+ self.lineNum = 0;
+ for line in lines:
+ self.lineNum+=1
+ # print $line_num," ",$line;
+ # Skip comment lines:
+ if re.match('^#', line):
+ continue
+ line = line.strip()
+ # Skip pure whitespace:
+ if line=='':
+ continue
+ # Handle declarations e.g. "SiteConfig(server): hostname of server"
+ m = re.match('([^:]*)\((.*)\):(.*)', line)
+ if m:
+ (decl, arg, value) = (,,
+ # Deal with it, stripping whitespace:
+ self.__handle_declaration(decl, arg.strip(), value.strip())
+ continue
+ # Handle key/value pairs e.g.: "Bug: 123456"
+ m = re.match('([^:]*):(.*)', line)
+ if not m:
+ self.handle_error("Malformed \"Key: value\" line")
+ continue
+ (key, value) = (,
+ # Strip leading and trailing whitespace:
+ value = value.strip()
+ # Note that I'm not quoting the values; this isn't talking direct to a DB
+ if key in fields:
+ handler = fields[key]
+ handler(key, value)
+ else:
+ self.handle_warning('Unknown field "%s"'%key)
+ # Postprocessing:
+ # Ensure mandatory fields have values:
+ self.__mandatory_field('Name', 'test_name')
+ self.__mandatory_field('Description', 'test_description')
+ self.__mandatory_field('Owner', 'owner')
+class PrintingParser(Parser):
+ """
+ A parser which handles errors/warnings by printing messages to a file object
+ """
+ def __init__(self, outputFileObj, inputFilename):
+ Parser.__init__(self)
+ self.outputFileObj = outputFileObj
+ self.inputFilename = inputFilename
+ self.numErrors = 0
+ self.numWarnings = 0
+ def handle_message(self, message, severity):
+ # Try to mimic the format of a GCC output line, e.g.:
+ # tmp.c:1: error: your code sucks
+ print >> self.outputFileObj, "%s:%i: %s: %s"%(self.inputFilename, self.lineNum, severity, message)
+ def handle_error(self, message):
+ self.handle_message(message, "error")
+ self.numErrors+=1
+ def handle_warning(self, message):
+ self.handle_message(message, "warning")
+ self.numWarnings+=1
+class StdoutParser(PrintingParser):
+ """
+ A parser which handles errors/warnings by printing messages to stdout
+ """
+ def __init__(self, inputFilename):
+ PrintingParser.__init__(self, sys.stdout, inputFilename)
+class StderrParser(PrintingParser):
+ """
+ A parser which handles errors/warnings by printing messages to stderr
+ """
+ def __init__(self, inputFilename):
+ PrintingParser.__init__(self, sys.stderr, inputFilename)
+ def handle_warning(self, message):
+ pass
+class ParserError(Exception):
+ pass
+class ParserWarning(Exception):
+ pass
+class StrictParser(Parser):
+ def __init__(self, raise_errors):
+ Parser.__init__(self)
+ self.raise_errors = raise_errors
+ def handle_error(self, message):
+ if self.raise_errors:
+ raise ParserError(message)
+ def handle_warning(self, message):
+ if self.raise_errors:
+ raise ParserWarning(message)
+class SemiStrictParser(StrictParser):
+ def handle_warning(self, message):
+ pass
+def parse_string(string, raise_errors = True):
+ p = StrictParser(raise_errors)
+ p.parse(string.split("\n"))
+ return
+def parse_file(filename, raise_errors = True):
+ p = StrictParser(raise_errors)
+ p.parse(open(filename).readlines())
+ return
+#class ParserTests(unittest.TestCase):
+# def test_key_value(self):
+# raise NotImplementedError
+# def test_decl_arg_value(self):
+# raise NotImplementedError
+class NamespaceTests(unittest.TestCase):
+ def test_package_not_found(self):
+ "Ensure that we get None for the namespace of an unrecognized package"
+ self.assertEquals(None, get_namespace_for_package('foobar'))
+ def test_simple_packages(self):
+ "Ensure that we get expected namespaces back for some simple packages"
+ self.assertEquals('desktop', get_namespace_for_package('evolution'))
+ self.assertEquals('tools', get_namespace_for_package('gcc'))
+class NameFieldTests(unittest.TestCase):
+ def test_name(self):
+ "Ensure Name field is parsed correctly"
+ ti = parse_string("Name: /CoreOS/cups/foo/bar", raise_errors=False)
+ self.assertEquals(ti.test_name, "/CoreOS/cups/foo/bar")
+class PathFieldTests(unittest.TestCase):
+ def test_path_absolute(self):
+ "Ensure absolute Path field is parsed correctly"
+ ti = parse_string("Path: /mnt/tests/CoreOS/cups/foo/bar", raise_errors=False)
+ self.assertEquals(ti.test_path, "/mnt/tests/CoreOS/cups/foo/bar")
+ def test_path_relative(self):
+ "Ensure relative Path field is parsed correctly"
+ ti = parse_string("Path: CoreOS/cups/foo/bar", raise_errors=False)
+ self.assertEquals(ti.test_path, "/mnt/tests/CoreOS/cups/foo/bar")
+class DescriptionFieldTests(unittest.TestCase):
+ def test_description(self):
+ "Ensure Description field is parsed correctly"
+ ti = parse_string("Description: Ensure that the thingummy frobnicates the doohickey", raise_errors=False)
+ self.assertEquals(ti.test_description, "Ensure that the thingummy frobnicates the doohickey")
+ def test_description_with_colon(self):
+ "Ensure Description field containing a colon is parsed correctly"
+ ti = parse_string("Description: This test is from http://foo/bar", raise_errors=False)
+ self.assertEquals(ti.test_description, "This test is from http://foo/bar")
+class ReleasesFieldTests(unittest.TestCase):
+ def test_releases(self):
+ "Ensure Releases field is parsed correctly"
+ ti = parse_string("Releases: FC5 FC6", raise_errors=False)
+ self.assertEquals(ti.releases, ['FC5', 'FC6'])
+class ArchitecturesFieldTests(unittest.TestCase):
+ def test_architectures(self):
+ "Ensure Architectures field is parsed correctly"
+ ti = parse_string("Architectures: i386 x86_64", raise_errors=False)
+ self.assertEquals(ti.test_archs, ["i386", "x86_64"])
+ def test_architectures_after_releases(self):
+ "Ensure that an Architectures field following a Releases field is parsed correctly"
+ ti = parse_string("""
+ Releases: FC5 FC6
+ Architectures: i386 x86_64""", raise_errors=False)
+ self.assertEquals(ti.releases, ['FC5', 'FC6'])
+ self.assertEquals(ti.test_archs, ["i386", "x86_64"])
+class RhtsOptionsFieldTests(unittest.TestCase):
+ def test_rhtsoptions(self):
+ "Ensure RhtsOptions field is parsed correctly"
+ ti = parse_string("RhtsOptions: Compatible", raise_errors=False)
+ self.assertEquals(ti.options, ["Compatible"])
+ def test_multi_options(self):
+ "Ensure RhtsOptions field is parsed correctly"
+ ti = parse_string("RhtsOptions: Compatible -CompatService -StrongerAVC", raise_errors=False)
+ self.assertEquals(ti.options, ["Compatible", "-CompatService", "-StrongerAVC"])
+ def test_rhtsoptions_minus(self):
+ "Ensure RhtsOptions field parses options preceded with dash correctly"
+ ti = parse_string("RhtsOptions: -Compatible", raise_errors=False)
+ self.assertEquals(ti.options, ["-Compatible"])
+ def test_rhtsoption_bad_value(self):
+ "Ensure RhtsOptions field captures bad input"
+ self.assertRaises(ParserError, parse_string, "RhtsOptions: Compat", raise_errors=True)
+ def test_rhtsoption_duplicate(self):
+ "Ensure RhtsOptions field captures duplicate entries"
+ self.assertRaises(ParserError, parse_string, "RhtsOptions: Compatible\nRhtsOptions: -Compatible", raise_errors=True)
+class EnvironmentFieldTests(unittest.TestCase):
+ def test_environment(self):
+ "Ensure Environment field is parsed correctly"
+ ti = parse_string("Environment: VAR1=VAL1\nEnvironment: VAR2=Value with spaces - 2", raise_errors=False)
+ self.assertEquals(ti.environment["VAR1"], "VAL1")
+ self.assertEquals(ti.environment["VAR2"], "Value with spaces - 2")
+ def test_environment_duplicate_key(self):
+ "Ensure Environment field captures duplicate keys"
+ self.assertRaises(ParserError, parse_string, "Environment: VAR1=VAL1\nEnvironment: VAR1=Value with spaces - 2", raise_errors=True)
+ def test_environment_bad_key(self):
+ "Ensure Environment field captures bad keys"
+ self.assertRaises(ParserError, parse_string, "Environment: VAR =VAL1", raise_errors=True)
+class NotifyFieldTests(unittest.TestCase):
+ def test_notify(self):
+ "Ensure Notify field is deprecated"
+ self.assertRaises(ParserWarning, parse_string, "Notify: everyone in a 5-mile radius", raise_errors=True)
+class OwnerFieldTests(unittest.TestCase):
+ def test_owner_example(self):
+ "Ensure that the example Owner field is parsed correctly"
+ ti = parse_string("Owner: John Doe <>", raise_errors=False)
+ self.assertEquals(ti.owner, "John Doe <>")
+ def test_owner_example2(self):
+ "Ensure that other Owner fields are parsed correctly"
+ ti = parse_string("Owner: Jane Doe <>", raise_errors=False)
+ self.assertEquals(ti.owner, "Jane Doe <>")
+ #
+ def test_owner_with_hyphen(self):
+ parser = StrictParser(raise_errors=True)
+ parser.handle_owner('Owner', 'Endre Balint-Nagy <>')
+ self.assertEquals(, 'Endre Balint-Nagy <>')
+class PriorityFieldTests(unittest.TestCase):
+ def test_priority(self):
+ "Ensure Priority field is parsed correctly"
+ ti = parse_string("Priority: Manual", raise_errors=False)
+ self.assertEquals(ti.priority, "Manual")
+class BugFieldTests(unittest.TestCase):
+ def test_single_bug(self):
+ "Ensure a single Bug field works"
+ ti = parse_string("Bug: 123456", raise_errors=False)
+ self.assertEquals(ti.bugs, [123456])
+ def test_single_bugs(self):
+ "Ensure a single Bugs field works"
+ ti = parse_string("Bugs: 123456", raise_errors=False)
+ self.assertEquals(ti.bugs, [123456])
+ def test_multiple_bugs(self):
+ "Ensure that multiple values for a Bugs field work"
+ ti = parse_string("Bugs: 123456 456123", raise_errors=False)
+ self.assertEquals(ti.bugs, [123456, 456123])
+ def test_multiple_bug_lines(self):
+ "Ensure that multiple Bug and Bugs lines work"
+ ti = parse_string("""Bugs: 123456 456123
+ Bug: 987654 456789""", raise_errors=False)
+ self.assertEquals(ti.bugs, [123456, 456123, 987654, 456789])
+ def test_blank_bug(self):
+ "Ensure a blank Bug field is handled"
+ ti = parse_string("Bug: ", raise_errors=False)
+ self.assertEquals(ti.bugs, [])
+class TestVersionFieldTests(unittest.TestCase):
+ def test_testversion(self):
+ "Ensure TestVersion field is parsed correctly"
+ ti = parse_string("TestVersion: 1.1", raise_errors=False)
+ self.assertEquals(ti.testversion, "1.1")
+class LicenseFieldTests(unittest.TestCase):
+ def test_license(self):
+ "Ensure License field is parsed correctly"
+ ti = parse_string("License: GPL", raise_errors=False)
+ self.assertEquals(ti.license, "GPL")
+class TestTimeFieldTests(unittest.TestCase):
+ def test_testtime_seconds(self):
+ "Ensure TestTime field can handle seconds"
+ ti = parse_string("TestTime: 5", raise_errors=False)
+ self.assertEquals(ti.avg_test_time, 5)
+ def test_testtime_minutes(self):
+ "Ensure TestTime field can handle minutes"
+ ti = parse_string("TestTime: 10m", raise_errors=False)
+ self.assertEquals(ti.avg_test_time, 600)
+ def test_testtime_hours(self):
+ "Ensure TestTime field can handle hours"
+ ti = parse_string("TestTime: 2h", raise_errors=False)
+ self.assertEquals(ti.avg_test_time, (2*60*60))
+class RequiresFieldTests(unittest.TestCase):
+ def test_single_line_requires(self):
+ "Ensure Requires field is parsed correctly"
+ ti = parse_string("Requires: evolution dogtail", raise_errors=False)
+ self.assertEquals(ti.requires, ['evolution', 'dogtail'])
+ def test_multiline_requires(self):
+ "Ensure we can handle multiple Requires lines"
+ ti = parse_string("""Requires: evolution dogtail
+ Requires: foo bar""", raise_errors=False)
+ self.assertEquals(ti.requires, ['evolution', 'dogtail', 'foo', 'bar'])
+ def test_requires_with_case_differences(self):
+ "Ensure Requires field is parsed correctly"
+ ti = parse_string("Requires: opencryptoki openCryptoki", raise_errors=False)
+ self.assertEquals(ti.requires, ['opencryptoki', 'openCryptoki'])
+class RunForFieldTests(unittest.TestCase):
+ def test_single_line_runfor(self):
+ "Ensure RunFor field is parsed correctly"
+ ti = parse_string("RunFor: evolution dogtail", raise_errors=False)
+ self.assertEquals(ti.runfor, ['evolution', 'dogtail'])
+ def test_multiline_runfor(self):
+ "Ensure we can handle multiple RunFor lines"
+ ti = parse_string("""RunFor: evolution dogtail
+ RunFor: foo bar""", raise_errors=False)
+ self.assertEquals(ti.runfor, ['evolution', 'dogtail', 'foo', 'bar'])
+class TypeFieldTests(unittest.TestCase):
+ def test_single_line_type(self):
+ "Ensure Type field is parsed correctly"
+ ti = parse_string("Type: Crasher Regression", raise_errors=False)
+ self.assertEquals(ti.types, ['Crasher', 'Regression'])
+ def test_multiline_type(self):
+ "Ensure we can handle multiple Type lines"
+ ti = parse_string("""Type: Crasher Regression
+ Type: Performance Stress""", raise_errors=False)
+ self.assertEquals(ti.types, ['Crasher', 'Regression', 'Performance', 'Stress'])
+class NeedPropertyFieldTests(unittest.TestCase):
+ def test_single_line_needproperty(self):
+ "Ensure NeedProperty field is parsed correctly"
+ ti = parse_string("NeedProperty: PROCESSORS > 1", raise_errors=False)
+ self.assertEquals(ti.need_properties, [("PROCESSORS", ">", "1")])
+ def test_multiline_needproperty(self):
+ "Ensure we can handle multiple NeedProperty lines"
+ ti = parse_string("""
+ NeedProperty: CAKE = CHOCOLATE
+ NeedProperty: SLICES > 3
+ """, raise_errors=False)
+ self.assertEquals(ti.need_properties, [("CAKE", "=", "CHOCOLATE"), ("SLICES", ">", "3")])
+class DestructiveFieldTests(unittest.TestCase):
+ def test_destructive(self):
+ ti = parse_string("Destructive: yes", raise_errors=False)
+ self.assertEquals(ti.destructive, True)
+class SiteConfigDeclarationTests(unittest.TestCase):
+ """Unit tests for the SiteConfig declaration"""
+ def test_relative_siteconfig_without_name(self):
+ "Ensure that a relative SiteConfig declaration without a Name is handled with a sane error"
+ self.assertRaises(ParserError, parse_string, "SiteConfig(server): Hostname of server", raise_errors=True)
+ def test_flat_relative_siteconfig(self):
+ "Ensure that relative SiteConfig declarations without nesting work"
+ ti = parse_string("""
+ Name: /desktop/evolution/mail/imap/authentication/ssl
+ SiteConfig(server): Hostname of server
+ SiteConfig(username): Username to use
+ SiteConfig(password): Password to use
+ """, raise_errors=False)
+ self.assertEquals(ti.siteconfig, [('/desktop/evolution/mail/imap/authentication/ssl/server', "Hostname of server"),
+ ('/desktop/evolution/mail/imap/authentication/ssl/username', "Username to use"),
+ ('/desktop/evolution/mail/imap/authentication/ssl/password', "Password to use")
+ ])
+ def test_nested_relative_siteconfig(self):
+ "Ensure that a relative SiteConfig declaration containing a path works"
+ ti = parse_string("""
+ Name: /desktop/evolution/mail/imap/authentication
+ SiteConfig(ssl/server): Hostname of server to try SSL auth against
+ SiteConfig(ssl/username): Username to use for SSL auth
+ SiteConfig(ssl/password): Password to use for SSL auth
+ SiteConfig(tls/server): Hostname of server to try TLS auth against
+ SiteConfig(tls/username): Username to use for TLS auth
+ SiteConfig(tls/password): Password to use for TLS auth
+ """, raise_errors=False)
+ self.assertEquals(ti.siteconfig, [('/desktop/evolution/mail/imap/authentication/ssl/server', "Hostname of server to try SSL auth against"),
+ ('/desktop/evolution/mail/imap/authentication/ssl/username', "Username to use for SSL auth"),
+ ('/desktop/evolution/mail/imap/authentication/ssl/password', "Password to use for SSL auth"),
+ ('/desktop/evolution/mail/imap/authentication/tls/server', "Hostname of server to try TLS auth against"),
+ ('/desktop/evolution/mail/imap/authentication/tls/username', "Username to use for TLS auth"),
+ ('/desktop/evolution/mail/imap/authentication/tls/password', "Password to use for TLS auth")
+ ])
+ def test_absolute_siteconfig(self):
+ "Ensure that an absolute SiteConfig declaration works"
+ ti = parse_string("""SiteConfig(/stable-servers/ldap/hostname): Location of stable LDAP server to use""", raise_errors=False)
+ self.assertEquals(ti.siteconfig, [('/stable-servers/ldap/hostname', 'Location of stable LDAP server to use')])
+ #def test_siteconfig_comment(self):
+ # "Ensure that comments are stripped as expected from descriptions"
+ # ti = parse_string("SiteConfig(/foo/bar): Some value # hello world", raise_errors=False)
+ # self.assertEquals(ti.siteconfig, [('/foo/bar', "Some value")])
+ def test_siteconfig_whitespace(self):
+ "Ensure that whitespace is stripped as expected from descriptions"
+ ti = parse_string("SiteConfig(/foo/bar): Some value ", raise_errors=False)
+ self.assertEquals(ti.siteconfig, [('/foo/bar', "Some value")])
+ def test_output_relative_siteconfig(self):
+ "Ensure that the output methods collapse redundant paths in relative SiteConfig declarations"
+ ti = TestInfo()
+ ti.test_name = '/foo/bar'
+ ti.siteconfig = [('/foo/bar/baz/fubar', 'Dummy value')]
+ self.assertEquals(ti.generate_siteconfig_lines(), "SiteConfig(baz/fubar): Dummy value\n")
+class IntegrationTests(unittest.TestCase):
+ def test_example_file(self):
+ "Ensure a full example file is parsed correctly"
+ ti = parse_string("""\
+# Test comment
+Owner: Jane Doe <>
+Name: /examples/coreutils/example-simple-test
+Path: /mnt/tests/examples/coreutils/example-simple-test
+Description: This test ensures that md5sums are generated and validated correctly
+TestTime: 1m
+TestVersion: 1.1
+License: GPL
+RunFor: coreutils
+Requires: coreutils python
+ """, raise_errors=True)
+ self.assertEquals(ti.owner, "Jane Doe <>")
+ self.assertEquals(ti.test_name, "/examples/coreutils/example-simple-test")
+ self.assertEquals(ti.test_path, "/mnt/tests/examples/coreutils/example-simple-test")
+ self.assertEquals(ti.test_description, "This test ensures that md5sums are generated and validated correctly")
+ self.assertEquals(ti.avg_test_time, 60)
+ self.assertEquals(ti.testversion, "1.1")
+ self.assertEquals(ti.license, "GPL")
+ self.assertEquals(ti.runfor, ["coreutils"])
+ self.assertEquals(ti.requires, ["coreutils", "python"])
+ def test_output_testinfo(self):
+ "Output an example file, then ensure it is parsed succesfully"
+ ti1 = parse_string("""\
+# Test comment
+Owner: Jane Doe <>
+Name: /examples/coreutils/example-simple-test
+Path: /mnt/tests/examples/coreutils/example-simple-test
+Description: This test ensures that md5sums are generated and validated correctly
+TestTime: 1m
+TestVersion: 1.1
+License: GPL
+Destructive: yes
+RunFor: coreutils
+Requires: coreutils python
+NeedProperty: CAKE = CHOCOLATE
+NeedProperty: SLICES > 3
+SiteConfig(server): Hostname of server
+SiteConfig(username): Username to use
+SiteConfig(password): Password to use
+SiteConfig(ssl/server): Hostname of server to try SSL auth against
+SiteConfig(ssl/username): Username to use for SSL auth
+SiteConfig(ssl/password): Password to use for SSL auth
+SiteConfig(tls/server): Hostname of server to try TLS auth against
+SiteConfig(tls/username): Username to use for TLS auth
+SiteConfig(tls/password): Password to use for TLS auth
+SiteConfig(/stable-servers/ldap/hostname): Location of stable LDAP server to use
+ """, raise_errors=True)
+ file = tempfile.NamedTemporaryFile(mode='w')
+ ti1.output(file)
+ file.flush()
+ p = StrictParser(raise_errors=True)
+ p.parse(open(, "r").readlines())
+ ti2=
+ self.assertEquals(ti2.owner, "Jane Doe <>")
+ self.assertEquals(ti2.test_name, "/examples/coreutils/example-simple-test")
+ self.assertEquals(ti2.test_path, "/mnt/tests/examples/coreutils/example-simple-test")
+ self.assertEquals(ti2.test_description, "This test ensures that md5sums are generated and validated correctly")
+ self.assertEquals(ti2.avg_test_time, 60)
+ self.assertEquals(ti2.testversion, "1.1")
+ self.assertEquals(ti2.license, "GPL")
+ self.assertEquals(ti2.destructive, True)
+ self.assertEquals(ti2.runfor, ["coreutils"])
+ self.assertEquals(ti2.requires, ["coreutils", "python"])
+ self.assertEquals(ti2.need_properties, [('CAKE', '=', 'CHOCOLATE'), ('SLICES', '>', '3')])
+ self.assertEquals(ti2.siteconfig, [('/examples/coreutils/example-simple-test/server', 'Hostname of server'),
+ ('/examples/coreutils/example-simple-test/username', 'Username to use'),
+ ('/examples/coreutils/example-simple-test/password', 'Password to use'),
+ ('/examples/coreutils/example-simple-test/ssl/server', 'Hostname of server to try SSL auth against'),
+ ('/examples/coreutils/example-simple-test/ssl/username', 'Username to use for SSL auth'),
+ ('/examples/coreutils/example-simple-test/ssl/password', 'Password to use for SSL auth'),
+ ('/examples/coreutils/example-simple-test/tls/server', 'Hostname of server to try TLS auth against'),
+ ('/examples/coreutils/example-simple-test/tls/username', 'Username to use for TLS auth'),
+ ('/examples/coreutils/example-simple-test/tls/password', 'Password to use for TLS auth'),
+ ('/stable-servers/ldap/hostname', 'Location of stable LDAP server to use')])
+if __name__=='__main__':
+ unittest.main()