#!/usr/bin/python
# vim: fileencoding=utf8 foldmethod=marker
# {{{ License header: GPLv2+
# This file is part of fedora-easy-karma.
#
# Fedora-easy-karma is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# Fedora-easy-karma is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with fedora-easy-karma. If not, see .
# }}}
# default python modules
import cPickle as pickle
import datetime
import fnmatch
import getpass
import itertools
import os
import readline
import sys
from optparse import OptionParser
from textwrap import wrap
# extra python modules
import fedora
# fedora_cert is optional. It is only used to get the real fas_username, which
# is also supplied as a command line option and eventually in a config file.
try:
import fedora_cert
except ImportError:
pass
import yum
from fedora.client.bodhi import BodhiClient
class FEK_helper(object):
@staticmethod
def bodhi_update_str(update, bodhi_base_url="https://admin.fedoraproject.org/updates/", bugzilla_bug_url="https://bugzilla.redhat.com/", test_cases_url="https://fedoraproject.org/wiki/", wrap_bugs=True, width=80):
# copy update to avoid side effects
values = dict(update)
format_string = (
"%(header_line)s\n"
"%(title)s\n"
"%(header_line)s\n"
"%(updateid)s"
" Release: %(release)s\n"
" Status: %(status)s\n"
" Type: %(type)s\n"
" Karma: %(karma)d\n"
"%(request)s"
"%(bugs)s"
"%(test_cases)s"
"%(notes)s"
" Submitter: %(submitter)s\n"
" Submitted: %(date_submitted)s\n"
"%(comments)s"
"\n%(update_url)s"
)
values["header_line"] = "=" * width
values["title"] = "\n".join(wrap(update["title"].replace(",", ", "), width=width, initial_indent=" "*5, subsequent_indent=" "*5))
if update["updateid"]:
values["updateid"] = " Update ID: %s\n" % update["updateid"]
else:
values["updateid"] = ""
values["release"] = update["release"]["long_name"]
values["type"] = ""
if "critpath" in update and update["critpath"]:
if not update["critpath_approved"]:
values["type"] = "unapproved "
values["type"] += "critpath "
values["type"] += update["type"]
if update["request"]:
values["request"] = " Request: %s\n" % update["request"]
else:
values["request"] = ""
if len(update["bugs"]):
bugs = []
for bug in update["bugs"]:
bz_id = bug["bz_id"]
if bugzilla_bug_url:
bz_id = "%s%d" % ( bugzilla_bug_url, bz_id)
bz_title = bug["title"]
bugs.append("%s - %s" % (bz_id, bz_title))
if wrap_bugs:
values["bugs"] = "%s\n" % FEK_helper.wrap_paragraphs_prefix(bugs, first_prefix=" Bugs: ", width=width, extra_newline=True)
else:
values["bugs"] = " Bugs: %s\n" % ("\n" + " " * 11 + ": ").join(bugs)
else:
values["bugs"] = ""
if update["nagged"] and "test_cases" in update["nagged"]:
test_cases = ["%s%s" % (test_cases_url, t.replace(" ", "_").replace(":", "%3A")) for t in update["nagged"]["test_cases"]]
values["test_cases"] = "%s\n" % FEK_helper.wrap_paragraphs_prefix(test_cases, first_prefix=" Test Cases: ", width=width, extra_newline=True)
else:
values["test_cases"] = ""
if update["notes"]:
values["notes"] = "%s\n" % FEK_helper.wrap_paragraphs_prefix(update["notes"].split("\r\n"), first_prefix=" Notes: ", width=width)
else:
values["notes"] = ""
if len(update["comments"]):
val = " Comments: "
comments = []
for comment in update["comments"]:
# copy comment to avoid side effects
comment = dict(comment)
indent = " " * 13
comment["indent"] = indent
if comment["anonymous"]:
comment["author"] += " (unauthenticated)"
comments.append("%(indent)s%(author)s - %(timestamp)s (karma %(karma)s)" % comment)
if comment["text"]:
wrapped = wrap(comment["text"], initial_indent=indent, subsequent_indent=indent, width=width)
comments.append("\n".join(wrapped))
val += "\n".join(comments).lstrip() + "\n"
values["comments"] = val
else:
values["comments"] = ""
if update["updateid"]:
url_path = "%s/%s" % (update["release"]["name"], update["updateid"])
else:
url_path = update["title"]
values["update_url"] = " %s%s\n" % (bodhi_base_url, url_path)
return format_string % values
@staticmethod
def wrap_paragraphs(paragraphs, width=67, subsequent_indent=(" "*11 + ": "), second_column_indent=0):
return ("\n%s" % subsequent_indent).join(map(lambda p: "\n".join(wrap(p, width=width, subsequent_indent=(subsequent_indent + " " * second_column_indent))), paragraphs))
@staticmethod
def wrap_paragraphs_prefix(paragraphs, first_prefix, width=80, extra_newline=False):
if isinstance(paragraphs, basestring):
paragraphs = paragraphs.split("\n")
if first_prefix:
subsequent_indent = " " * (len(first_prefix) - 2) + ": "
else:
subsequent_indent = ""
output = []
first = True
wrapped = []
# remove trailing empty paragraphs
while paragraphs and paragraphs[-1] == "":
paragraphs.pop()
for p in paragraphs:
if extra_newline and len(wrapped) > 1:
output.append("")
if first:
p = first_prefix + p
first = False
wrapped = wrap(p, width=width, subsequent_indent=subsequent_indent)
output.append("\n".join(wrapped))
return ("\n%s" % subsequent_indent).join(output)
class FedoraEasyKarma(object):
def __init__(self):
usage = (
"usage: %prog [options] [pattern, ..] \n\n"
"You will be asked for every package installed from updates-testing to provide feedback using karma points. "
"If patterns are provided, you will be only prompted for updates related to packages or builds that match any "
"of the patterns. Possible wildcards are *, ?, [seq] and [!seq] as explained at http://docs.python.org/library/fnmatch.html\n"
"After selecting the karma points, you will be asked for a comment. An empty comment skips the update.\n\n"
"Possible karma points are:\n"
"-1 : Update breaks something or does not fix a bug it is supposed to\n"
" 0 : The update has not been tested much or at all\n"
" 1 : The update seems not to break anything new\n"
"All other inputs will skip the update.\n"
"You can use - on an empty prompt to exit\n"
"If you use a default comment, '- ' can be used to delete the default comment to easily enter a custom one.\n"
"\n"
"The source can be found at\n"
"http://fedorapeople.org/gitweb?p=till/public_git/fedora-easy-karma.git;a=summary\n"
"Please send bug reports and feature requests to\n"
"'Till Maas '\n"
"For patches please use 'git send-email'."
)
usage = FEK_helper.wrap_paragraphs_prefix(usage, first_prefix="", width=80, extra_newline=False)
parser = OptionParser(usage=usage)
parser.add_option("", "--bodhi-cached", dest="bodhi_cached", help="Use cached bodhi query", action="store_true", default=False)
parser.add_option("", "--bodhi-cachedir", dest="bodhi_cachedir", help="Directory to store bodhi cache, default: %default", default="~/.fedora-easy-karma")
parser.add_option("", "--bodhi-update-cache", dest="bodhi_update_cache", help="Update bodhi query cache", action="store_true", default=False)
parser.add_option("", "--critpath-only", dest="critpath_only", help="Only consider unapproved critpath updates", action="store_true", default=False)
parser.add_option("", "--debug", dest="debug", help="Enable debug output", action="store_true", default=False)
parser.add_option("", "--default-comment", dest="default_comment", help="Default comment to use, default: %default", default="", metavar="COMMENT")
parser.add_option("", "--default-karma", dest="default_karma", help="Default karma to use, default: %default", default="", metavar="KARMA")
parser.add_option("", "--fas-username", dest="fas_username", help="FAS username", default=None)
parser.add_option("", "--include-commented", dest="include_commented", help="Also ask for more comments on updates that already got a comment from you, this is enabled if patterns are provided", action="store_true", default=False)
parser.add_option("", "--installed-max-days", dest="installed_max_days", help="Only check packages installed within the last XX days, default: %default", metavar="DAYS", default=28, type="int")
parser.add_option("", "--installed-min-days", dest="installed_min_days", help="Only check packages installed for at least XX days, default: %default", metavar="DAYS", default=0, type="int")
parser.add_option("", "--list-rpms-only", dest="list_rpms_only", help="Only list affected rpms", action="store_true", default=False)
parser.add_option("", "--no-skip-empty-comment", dest="skip_empty_comment", help="Do not skip update if comment is empty", action="store_false", default=True)
parser.add_option("", "--product", dest="product", help="product to query Bodhi for, 'F' for Fedora, 'EL-' for EPEL, default: %default", default="F")
parser.add_option("", "--releasever", dest="releasever", help="releasever to query Bodhi for, default: releasever from yum", default=None)
parser.add_option("", "--retries", dest="retries", help="Number if retries when submitting a comment in case of an error, default: %default", default=3, type="int")
parser.add_option("", "--wrap-bugs", dest="wrap_bugs", help="Apply line-wrapping to bugs", action="store_true", default=False)
parser.add_option("", "--wrap-rpms", dest="wrap_rpms", help="Apply line-wrapping to list of installed rpms", action="store_true", default=False)
parser.add_option("", "--wrap-width", dest="wrap_width", help="Width to use for line wrapping of updates, default: %default", default=80, type="int")
(self.options, args) = parser.parse_args()
if args:
self.options.include_commented = True
if self.options.debug:
self.options.debug = datetime.datetime.now()
if not self.options.fas_username:
try:
try:
fas_username = fedora_cert.read_user_cert()
except (fedora_cert.fedora_cert_error):
self.debug("fedora_cert_error")
raise NameError
except NameError:
self.debug("fas_username NameError")
fas_username = os.environ["LOGNAME"]
self.options.fas_username = fas_username
bc = BodhiClient(username=self.options.fas_username, useragent="Fedora Easy Karma/GIT")
my = yum.YumBase()
my.preconf.debuglevel = 0
if not self.options.releasever:
self.options.releasever = my.conf.yumvar["releasever"]
release = "%s%s" % (self.options.product, self.options.releasever)
self.options.bodhi_cachedir = os.path.expanduser(self.options.bodhi_cachedir)
installed_testing_builds = {}
now = datetime.datetime.now()
installed_max_days = datetime.timedelta(self.options.installed_max_days)
installed_min_days = datetime.timedelta(self.options.installed_min_days)
self.info("Getting list of installed packages...")
self.debug("starting yum query")
# make pkg objects subscriptable, i.e. pkg["name"] work
yum.rpmsack.RPMInstalledPackage.__getitem__ = lambda self, key: getattr(self, key)
for pkg in my.rpmdb.returnPackages():
installed = datetime.datetime.fromtimestamp(pkg.installtime)
installed_timedelta = now - installed
if installed_timedelta < installed_max_days and installed_timedelta > installed_min_days:
build = pkg.sourcerpm[:-8]
if build in installed_testing_builds:
installed_testing_builds[build].append(pkg)
else:
installed_testing_builds[build] = [pkg]
cachefile_name = os.path.join(self.options.bodhi_cachedir, "bodhi-cache-%s.cpickle" % release)
if self.options.bodhi_cached:
self.debug("reading bodhi cache")
cachefile = open(cachefile_name, "rb")
testing_updates = pickle.load(cachefile)
cachefile.close()
else:
self.info("Getting list of packages in updates-testing...")
self.debug("starting bodhi query")
# probably raises some exceptions
testing_updates = bc.query(release=release, status="testing", limit=1000)["updates"]
# can't query for requestless as of python-fedora 0.3.18
# (request=None results in no filtering by request)
testing_updates = [x for x in testing_updates if not x["request"]]
# extend list of updates with updates that are going to testing to
# support manually installed rpms from koji
testing_updates.extend(bc.query(release=release, status="pending", request="testing", limit=1000)["updates"])
if self.options.bodhi_update_cache:
try:
os.makedirs(self.options.bodhi_cachedir)
except OSError:
# only pass for Errno 17: file exists
self.debug("makedirs OSError", update_timestamp=False)
self.debug("writing cache")
outfile = open(cachefile_name, "wb")
pickle.dump(testing_updates, outfile, -1)
outfile.close()
self.debug("post processing bodhi query")
# reduce to unapproved critpath updates. Cannot query for this in
# python-fedora 0.3.20 and might not want to do to keep the cache
# complete
if self.options.critpath_only:
testing_updates = [u for u in testing_updates if u["critpath"] and not u["critpath_approved"]]
# create a mapping build -> update
testing_builds = {}
for update in testing_updates:
if self.options.include_commented or not self.already_commented(update, self.options.fas_username):
for build in update["builds"]:
testing_builds[build["nvr"]] = update
self.debug("starting feedback loop")
# multiple build can be grouped together in one update, only ask once per update
processed_updates = []
builds = testing_builds.keys()
builds.sort()
for build in builds:
update = testing_builds[build]
if update not in processed_updates and build in installed_testing_builds:
processed_updates.append(update)
affected_builds = [b["nvr"] for b in update["builds"]]
installed_pkgs = list(itertools.chain(*[installed_testing_builds[b] for b in affected_builds if b in installed_testing_builds]))
if args:
if not self.match_any(args, [["%(name)s" % pkg for pkg in installed_pkgs],
# remove version and release
["-".join(b.split("-")[:-2]) for b in affected_builds]]):
continue
installed_rpms = [self.format_rpm(pkg) for pkg in installed_pkgs]
if not self.options.list_rpms_only:
print FEK_helper.bodhi_update_str(update, bodhi_base_url=bc.base_url, width=self.options.wrap_width, wrap_bugs=self.options.wrap_bugs)
if self.options.wrap_rpms:
print FEK_helper.wrap_paragraphs_prefix(installed_rpms, first_prefix=" inst. RPMS: ", width=self.options.wrap_width)
else:
print " inst. RPMS: %s\n" % ("\n" + " " * 11 + ": ").join(installed_rpms)
if self.already_commented(update, self.options.fas_username):
print "!!! already commented by you !!!"
try:
karma = self.raw_input("Comment? -1/0/1 ->karma, other -> skip> ", default=self.options.default_karma, add_to_history=False)
if karma in ["-1", "0", "1"]:
comment = self.raw_input("Comment> ", default=self.options.default_comment)
if comment or not self.options.skip_empty_comment:
result = self.send_comment(bc, update, comment, karma)
if not result[0]:
self.warning("Comment not submitted: %s" % result[1])
else:
print "skipped because of empty comment"
except EOFError:
sys.stdout.write("\nExiting on User request\n")
sys.exit(0)
else:
print "\n".join(installed_rpms)
def already_commented(self, update, user):
for comment in update["comments"]:
# :TODO:WORKAROUND:
# .split(" ")[0] is needed to work around bodhi using
# 'fas_username (group)' in the author field. Hopefully
# bodhi will eventually not do this anymore.
# References:
# https://fedorahosted.org/bodhi/ticket/400
# https://bugzilla.redhat.com/show_bug.cgi?id=572228
if not comment["anonymous"] and comment["author"].split(" ")[0] == user:
return True
return False
def debug(self, message, update_timestamp=True):
if self.options.debug:
now = datetime.datetime.now()
delta = now - self.options.debug
message = "DEBUG: %s - timedelta: %s" % (message, delta)
if update_timestamp:
self.options.debug = now
else:
message = "%s - timestamp not updated" % message
sys.stderr.write("%s\n" % message)
def format_rpm(self, rpm):
now = datetime.datetime.now()
install_age=(now - datetime.datetime.fromtimestamp(rpm.installtime))
res = "%(name)s-%(version)s-%(release)s.%(arch)s - %(summary)s" % rpm
res += " (installed %s days ago)" % install_age.days
return res
def info(self, message):
sys.stderr.write("%s\n" % message)
def match_any(self, patterns, names):
for name in list(itertools.chain(*names)):
for pattern in patterns:
if fnmatch.fnmatch(name, pattern):
return True
return False
def warning(self, message):
sys.stderr.write("Warning: %s\n" % message)
def raw_input(self, prompt, default="", add_to_history=True):
def pre_input_hook():
readline.insert_text(default)
readline.redisplay()
readline.set_pre_input_hook(pre_input_hook)
try:
return raw_input(prompt)
finally:
readline.set_pre_input_hook(None)
if not add_to_history:
try:
readline.remove_history_item(readline.get_current_history_length() - 1)
# raised when CTRL-D is used on first prompt
except ValueError:
pass
def send_comment(self, bc, update, comment, karma):
for retry in range(0, self.options.retries + 1):
try:
res = bc.comment(update["title"], comment, karma=karma)
return (True, res)
except fedora.client.AuthError, e:
self.warning("Authentication error")
bc.password = getpass.getpass('FAS Password for %s: ' % self.options.fas_username)
except fedora.client.ServerError, e:
self.warning("Server error: %s" % str(e))
return (False, 'too many errors')
if __name__ == "__main__":
FedoraEasyKarma()