#!/usr/bin/python # vim: fileencoding=utf8 # Copyright 2010-2017 Till Maas and others # This file is part of fedora-easy-karma. # # Fedora-easy-karma is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 2 of the License, or # (at your option) any later version. # # Fedora-easy-karma is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with fedora-easy-karma. If not, see . # default python modules import cPickle as pickle import datetime import fnmatch import getpass import itertools import os import readline import sys import re from optparse import OptionParser from textwrap import wrap # extra python modules import fedora # fedora_cert is optional. It is only used to get the real fas_username, which # is also supplied as a command line option and eventually in a config file. try: import fedora_cert except ImportError: pass try: import dnf except ImportError: dnf = None import yum from fedora.client.bodhi import BodhiClient PROMPT = "Comment? -1/0/1 -> karma, 'i' -> ignore, other -> skip> " def munch_to_dict(munch): """ Recursively convert Munch class used in Bodhi2 to dict for easy pretty printing """ if str(munch.__class__) == "": return dict([(i[0], munch_to_dict(i[1])) for i in munch.items()]) elif munch.__class__ == list: return [munch_to_dict(i) for i in munch] else: return munch class FEK_helper(object): @staticmethod def bodhi_update_str( update, bodhi_base_url="https://bodhi.fedoraproject.org/", bugzilla_bug_url="https://bugzilla.redhat.com/", test_cases_url="https://fedoraproject.org/wiki/", wrap_bugs=True, width=80): # copy update to avoid side effects values = dict(update) format_string = ( "%(header_line)s\n" "%(title)s\n" "%(header_line)s\n" "%(updateid)s" " Release: %(release)s\n" " Status: %(status)s\n" " Type: %(type)s\n" " Karma: %(karma_status)s\n" "%(request)s" "%(bugs)s" "%(test_cases)s" "%(notes)s" " Submitter: %(submitter)s\n" " Submitted: %(date_submitted)s\n" "%(comments)s" "\n%(update_url)s") values["header_line"] = "=" * width values["title"] = "\n".join( wrap(update["title"].replace(",", ", "), width=width, initial_indent=" " * 5, subsequent_indent=" " * 5) ) if update["updateid"]: values["updateid"] = " Update ID: %s\n" % update["updateid"] else: values["updateid"] = "" values["release"] = update["release"]["long_name"] values["type"] = "" if "critpath" in update and update["critpath"]: # I'm not sure if this is what the data member acutally means, # assuming for now if not update["date_approved"]: values["type"] = "unapproved " values["type"] += "critpath " values["type"] += update["type"] if update["request"]: values["request"] = " Request: %s\n" % update["request"] else: values["request"] = "" if len(update["bugs"]): bugs = [] for bug in update["bugs"]: bug_id = bug["bug_id"] if bugzilla_bug_url: bug_id = "%s%d" % (bugzilla_bug_url, bug_id) bz_title = bug["title"] bugs.append("%s - %s" % (bug_id, bz_title)) if wrap_bugs: values["bugs"] = "%s\n" % FEK_helper.wrap_paragraphs_prefix( bugs, first_prefix=" Bugs: ", width=width, extra_newline=True ) else: values["bugs"] = " Bugs: %s\n" % ( "\n" + " " * 11 + ": ").join(bugs) else: values["bugs"] = "" test_cases = [] for case in update["test_cases"]: tc = case['name'] tc = tc.replace(" ", "_").replace(":", "%3A") test_cases.append(test_cases_url + tc) if len(test_cases) > 0: values["test_cases"] = "%s\n" % FEK_helper.wrap_paragraphs_prefix( test_cases, first_prefix=" Test Cases: ", width=width, extra_newline=True ) else: values["test_cases"] = "" if update["notes"]: values["notes"] = "%s\n" % FEK_helper.wrap_paragraphs_prefix( update["notes"].split("\r\n"), first_prefix=" Notes: ", width=width ) else: values["notes"] = "" if len(update["comments"]): val = " Comments: " comments = [] for comment in update["comments"]: # copy comment to avoid side effects comment = dict(comment) indent = " " * 13 comment["indent"] = indent # the format of the user has changed, add a data member comment["username"] = comment["user"]["name"] if comment["anonymous"]: comment["username"] += " (unauthenticated)" comments.append( "%(indent)s%(username)s - %(timestamp)s " "(karma %(karma)s)" % comment ) if comment["text"]: wrapped = wrap(comment["text"], initial_indent=indent, subsequent_indent=indent, width=width) comments.append("\n".join(wrapped)) val += "\n".join(comments).lstrip() + "\n" values["comments"] = val else: values["comments"] = "" if update["alias"]: url_path = update["alias"] else: url_path = update["title"] values["update_url"] = " %supdates/%s\n" % (bodhi_base_url, url_path) # stable_karma can come back as None, meaning that auto-requesting push # to stable has been disabled. # https://github.com/fedora-infra/bodhi/issues/274 if values['stable_karma'] is None: values['karma_status'] = "%d" % (values["karma"]) else: values["karma_status"] = "%d/%s" % (values["karma"], values["stable_karma"]) return format_string % values @staticmethod def wrap_paragraphs(paragraphs, width=67, subsequent_indent=(" " * 11 + ": "), second_column_indent=0): return ("\n%s" % subsequent_indent).join( map(lambda p: "\n".join( wrap(p, width=width, subsequent_indent=( subsequent_indent + " " * second_column_indent) ) ), paragraphs ) ) @staticmethod def wrap_paragraphs_prefix(paragraphs, first_prefix, width=80, extra_newline=False): if isinstance(paragraphs, basestring): paragraphs = paragraphs.split("\n") if first_prefix: subsequent_indent = " " * (len(first_prefix) - 2) + ": " else: subsequent_indent = "" output = [] first = True wrapped = [] # remove trailing empty paragraphs while paragraphs and paragraphs[-1] == "": paragraphs.pop() for p in paragraphs: if extra_newline and len(wrapped) > 1: output.append("") if first: p = first_prefix + p first = False wrapped = wrap(p, width=width, subsequent_indent=subsequent_indent) output.append("\n".join(wrapped)) return ("\n%s" % subsequent_indent).join(output) USAGE = """usage: %prog [options] [pattern, ..] You will be asked for every package installed from updates-testing to provide feedback using karma points. If patterns are provided, you will be only prompted for updates related to packages or builds that match any of the patterns. Possible wildcards are *, ?, [seq] and [!seq] as explained at http://docs.python.org/library/fnmatch.html Possible values in the karma prompt: -1,0 or 1: Assign the respective karma value to the update i: Ignore the update in the future Other inputs will skip the update. After assigning karma to the update, a comment needs to be provided, otherwise the update will be skipped. Note: - on an empty prompt exits the program. If you use a default comment, '- ' can be used to delete the default comment to easily enter a custom one. For further documentation, please visit: https://fedoraproject.org/wiki/Fedora_Easy_Karma Copyright 2010-2017 Till Maas and others fedora-easy-karma is distributed under the terms of the GNU General Public License The source is available at: https://pagure.io/fedora-easy-karma """ class PkgHelper(object): def __init__(self): if dnf is not None: self.my = dnf.Base() # if not dnf.util.am_i_root(): # cachedir = dnf.yum.misc.getCacheDir() # my.conf.cachedir = cachedir self.my.fill_sack() self.releasever = dnf.rpm.detect_releasever("/") self.package_manager = 'dnf' # make pkg objects subscriptable, i.e. pkg["name"] work dnf.package.Package.__getitem__ = lambda self, key: \ getattr(self, key) else: self.my = yum.YumBase() self.my.preconf.debuglevel = 0 self.releasever = self.my.conf.yumvar["releasever"] if not self.releasever.isdigit(): self.releasever = re.match('^(\d+)', self.releasever).groups(1)[0] self.package_manager = 'yum' # make pkg objects subscriptable, i.e. pkg["name"] work yum.rpmsack.RPMInstalledPackage.__getitem__ = lambda self, key: \ getattr(self, key) @property def installed_packages(self): if dnf is not None: return self.my.sack.query().installed() else: return self.my.rpmdb.returnPackages() class FedoraEasyKarma(object): def __init__(self): usage = FEK_helper.wrap_paragraphs_prefix( USAGE, first_prefix="", width=80, extra_newline=False) parser = OptionParser(usage=usage) parser.add_option("", "--bodhi-cached", dest="bodhi_cached", help="Use cached bodhi query", action="store_true", default=False) parser.add_option("", "--bodhi-update-cache", dest="bodhi_update_cache", help="Update bodhi query cache", action="store_true", default=False) parser.add_option("", "--critpath-only", dest="critpath_only", help="Only consider unapproved critpath updates", action="store_true", default=False) parser.add_option("", "--datadir", dest="datadir", help="Directory to store cache or ignore data, " "default: %default", default="~/.fedora-easy-karma") parser.add_option("", "--debug", dest="debug", help="Enable debug output", action="store_true", default=False) parser.add_option("", "--default-comment", dest="default_comment", help="Default comment to use, default: %default", default="", metavar="COMMENT") parser.add_option("", "--default-karma", dest="default_karma", help="Default karma to use, default: %default", default="", metavar="KARMA") parser.add_option("", "--fas-username", dest="fas_username", help="FAS username", default=None) parser.add_option("", "--no-ignore-own", dest="ignore_own", help="Do not ignore own updates.", action="store_false", default=True) parser.add_option("", "--include-commented", dest="include_commented", help="Also ask for more comments on updates that " "already got a comment from you, this is " "enabled if patterns are provided", action="store_true", default=False) parser.add_option("", "--include-ignored", dest="include_ignored", help="Also ask for comments on updates that have " "been ignored previously.", action="store_true", default=False) parser.add_option("", "--installed-max-days", dest="installed_max_days", help="Only check packages installed within the last " "XX days, default: %default", metavar="DAYS", default=28, type="int") parser.add_option("", "--installed-min-days", dest="installed_min_days", help="Only check packages installed for at least " "XX days, default: %default", metavar="DAYS", default=0, type="int") parser.add_option("", "--ipdb", dest="ipdb", help="Launch ipbd for debugging", action="store_true", default=False) parser.add_option("", "--list-rpms-only", dest="list_rpms_only", help="Only list affected rpms", action="store_true", default=False) parser.add_option("", "--no-skip-empty-comment", dest="skip_empty_comment", help="Do not skip update if comment is empty", action="store_false", default=True) parser.add_option("", "--product", dest="product", help="product to query Bodhi for, 'F' for Fedora, " "'EL-' for EPEL, default: %default", default="F") parser.add_option("", "--releasever", dest="releasever", help="releasever to query Bodhi for, " "default: releasever from dnf or yum", default=None) parser.add_option("", "--retries", dest="retries", help="Number if retries when submitting a comment " "in case of an error, default: %default", default=3, type="int") parser.add_option("", "--wrap-bugs", dest="wrap_bugs", help="Apply line-wrapping to bugs", action="store_true", default=False) parser.add_option("", "--wrap-rpms", dest="wrap_rpms", help="Apply line-wrapping to list of installed rpms", action="store_true", default=False) parser.add_option("", "--wrap-width", dest="wrap_width", help="Width to use for line wrapping of updates, " "default: %default", default=80, type="int") (self.options, args) = parser.parse_args() if args: self.options.include_commented = True if self.options.debug: self.options.debug = datetime.datetime.now() if self.options.product == "F": release_filename = "/etc/fedora-release" try: with open(release_filename, "rb") as release_file: if "Rawhide" in release_file.read(): print "'Rawhide' found in %s, aborting, because "\ "there is no updates-testing for "\ "Rawhide" % release_filename sys.exit(1) except IOError: self.warning("Cannot read '%s', this system might not be " "supported" % release_filename) if not self.options.fas_username: try: try: fas_username = fedora_cert.read_user_cert() except (fedora_cert.fedora_cert_error): self.debug("fedora_cert_error") raise NameError except NameError: self.debug("fas_username NameError") fas_username = os.environ["LOGNAME"] self.options.fas_username = fas_username # note that the retry logic in the bodhi client is currently not # functional # https://github.com/fedora-infra/python-fedora/issues/144 bc = BodhiClient(username=self.options.fas_username, useragent="Fedora Easy Karma/GIT", retries=self.options.retries) self.bc = bc # Bodhi is too slow for our queries, therefore wait longer bc.timeout = 300 pkghelper = PkgHelper() if not self.options.releasever: self.options.releasever = pkghelper.releasever release = "%s%s" % (self.options.product, self.options.releasever) self.options.datadir = os.path.expanduser(self.options.datadir) installed_testing_builds = {} now = datetime.datetime.now() installed_max_days = datetime.timedelta( self.options.installed_max_days) installed_min_days = datetime.timedelta( self.options.installed_min_days) self.info("Getting list of installed packages...") self.debug("starting %s query" % pkghelper.package_manager) for pkg in pkghelper.installed_packages: installed = datetime.datetime.fromtimestamp(pkg.installtime) installed_timedelta = now - installed if installed_timedelta < installed_max_days and \ installed_timedelta > installed_min_days: build = pkg.sourcerpm[:-8] if build in installed_testing_builds: installed_testing_builds[build].append(pkg) else: installed_testing_builds[build] = [pkg] cachefile_name = os.path.join( self.options.datadir, "bodhi-cache-%s.cpickle" % release) if self.options.bodhi_cached: self.debug("reading bodhi cache") try: cachefile = open(cachefile_name, "rb") testing_updates = pickle.load(cachefile) cachefile.close() except IOError, ioe: print "Cannot access bodhi cache file: %s" % cachefile_name sys.exit(ioe.errno) else: self.info("Waiting for Bodhi for a list of packages in " "updates-testing (%s)..." % release) self.debug("starting bodhi query") testing_updates = self.query_bodhi(bc, release, pending=False) print("found {} testing updates".format(len(testing_updates))) # can't query for requestless as of python-fedora 0.3.18 # (request=None results in no filtering by request) testing_updates = [x for x in testing_updates if not x["request"]] # extend list of updates with updates that are going to testing to # support manually installed rpms from koji pending_updates = self.query_bodhi(bc, release, pending=True) testing_updates.extend(pending_updates) del pending_updates if self.options.bodhi_update_cache: try: os.makedirs(self.options.datadir) except OSError: # only pass for Errno 17: file exists self.debug("makedirs OSError", update_timestamp=False) self.debug("writing cache") outfile = open(cachefile_name, "wb") pickle.dump(testing_updates, outfile, -1) outfile.close() ignorefile_name = os.path.join(self.options.datadir, "ignore.cpickle") previously_ignored_updates = [] self.debug("reading ignore file %s" % ignorefile_name) try: ignorefile = open(ignorefile_name, "rb") previously_ignored_updates = pickle.load(ignorefile) ignorefile.close() except IOError, ioe: self.debug("Cannot access ignore file: %s" % ignorefile_name) self.debug("post processing bodhi query") # reduce to unapproved critpath updates. Cannot query for this in # python-fedora 0.3.20 and might not want to do to keep the cache # complete if self.options.critpath_only: testing_updates = [u for u in testing_updates if u["critpath"] and not u["critpath_approved"]] # create a mapping build -> update testing_builds = {} for update in testing_updates: if self.options.include_commented or not \ self.already_commented(update, self.options.fas_username): for build in update["builds"]: testing_builds[build["nvr"]] = update self.debug("starting feedback loop") # multiple build can be grouped together in one update, only ask once # per update processed_updates = [] ignored_updates = [] builds = testing_builds.keys() builds.sort() if not builds: print "No testing packages found, install some with: "\ "'%s update --enablerepo=\"*-testing\"'" % \ pkghelper.package_manager for build in builds: update = testing_builds[build] # Do not query for previously ignored updates # Store update title to save these to a file if not self.options.include_ignored and \ update.title in previously_ignored_updates: print "ignored: %s" % update.title ignored_updates.append(update.title) continue # Ignore own updates if self.options.ignore_own and \ update["submitter"] == self.options.fas_username: continue if update not in processed_updates and \ build in installed_testing_builds: processed_updates.append(update) affected_builds = [b["nvr"] for b in update["builds"]] installed_pkgs = list( itertools.chain(*[installed_testing_builds[b] for b in affected_builds if b in installed_testing_builds]) ) if args: installed_pkgs_names = ["%(name)s" % pkg for pkg in installed_pkgs] # remove version and release affected_builds_names = ["-".join(b.split("-")[:-2]) for b in affected_builds] if not self.match_any(args, [installed_pkgs_names, affected_builds_names]): continue installed_rpms = [ self.format_rpm(pkg) for pkg in installed_pkgs] if self.options.ipdb: import ipdb ipdb.set_trace() if not self.options.list_rpms_only: print FEK_helper.bodhi_update_str( update, bodhi_base_url=bc.base_url, width=self.options.wrap_width, wrap_bugs=self.options.wrap_bugs ) if self.options.wrap_rpms: print FEK_helper.wrap_paragraphs_prefix( installed_rpms, first_prefix=" inst. RPMS: ", width=self.options.wrap_width) else: indentation = "\n" + " " * 11 + ": " rpmlist = indentation.join(installed_rpms) print " inst. RPMS: %s\n" % rpmlist if self.already_commented(update, self.options.fas_username): print "!!! already commented by you !!!" try: karma = self.raw_input( PROMPT, default=self.options.default_karma, add_to_history=False) if karma in ["-1", "0", "1"]: comment = self.raw_input( "Comment> ", default=self.options.default_comment) if comment or not self.options.skip_empty_comment: result = self.send_comment(bc, update, comment, karma) if not result[0]: self.warning("Comment not submitted: %s" % result[1]) else: print "skipped because of empty comment" elif karma == "i": ignored_updates.append(update.title) print "ignored as requested" except EOFError: ignored_updates.extend(previously_ignored_updates) sys.stdout.write("\nExiting on User request\n") break else: print "\n".join(installed_rpms) # store ignored_updates try: os.makedirs(self.options.datadir) except OSError: # :TODO: only pass for Errno 17: file exists self.debug("makedirs OSError", update_timestamp=False) self.debug("writing ignore file") outfile = open(ignorefile_name, "wb") pickle.dump(ignored_updates, outfile, -1) outfile.close() def query_bodhi(self, bodhi_client, release, pending=False): """Deal with querying bodhi and combining all relevant pages into a single list of updates.""" query_args = {"release": release, "limit": 1000 } if pending: query_args["request"] = "testing" query_args["status"] = "pending" else: query_args["status"] = "testing" updates = [] try: # since bodhi has a query limit but multiple pages, get ALL of the # updates before starting to process result = bodhi_client.query(**query_args) self.debug("Queried Bodhi page 1", False) updates.extend(result['updates']) while result.page < result.pages: next_page = result['page'] + 1 self.info("Fetching updates page {} of {}".format( next_page, result['pages'])) result = bodhi_client.query(page=next_page, **query_args) self.debug("Queried Bodhi page %s" % next_page, False) updates.extend(result['updates']) # There is no clear indication which Exceptions bc.query() might # throw, therefore catch all (python-fedora-0.3.32.3-1.fc19) except Exception as e: print "Error while querying Bodhi: {0}".format(e) raise e return updates def already_commented(self, update, user): for comment in update["comments"]: if not comment["anonymous"] and comment["user"]["name"] == user: return True return False def debug(self, message, update_timestamp=True): if self.options.debug: now = datetime.datetime.now() delta = now - self.options.debug message = "DEBUG: %s - timedelta: %s" % (message, delta) if update_timestamp: self.options.debug = now else: message = "%s - timestamp not updated" % message sys.stderr.write("%s\n" % message) def format_rpm(self, rpm): now = datetime.datetime.now() install_age = (now - datetime.datetime.fromtimestamp(rpm.installtime)) res = "%(name)s-%(version)s-%(release)s.%(arch)s - %(summary)s" % rpm res += " (installed %s days ago)" % install_age.days return res def info(self, message): sys.stderr.write("%s\n" % message) def match_any(self, patterns, names): for name in list(itertools.chain(*names)): for pattern in patterns: if fnmatch.fnmatch(name, pattern): return True return False def warning(self, message): sys.stderr.write("Warning: %s\n" % message) def raw_input(self, prompt, default="", add_to_history=True): def pre_input_hook(): readline.insert_text(default) readline.redisplay() readline.set_pre_input_hook(pre_input_hook) try: return raw_input(prompt) finally: readline.set_pre_input_hook(None) if not add_to_history: try: readline.remove_history_item( readline.get_current_history_length() - 1) # raised when CTRL-D is used on first prompt except ValueError: pass def send_comment(self, bc, update, comment, karma): orig_retries = bc.retries bc.retries = 1 for retry in range(0, self.options.retries + 1): try: res = bc.comment(update["title"], comment, karma=karma) bc.retries = orig_retries return (True, res) except fedora.client.AuthError, e: self.warning("Authentication error") bc.password = getpass.getpass('FAS password for %s: ' % self.options.fas_username) except fedora.client.ServerError, e: self.warning("Server error: %s" % str(e)) bc.retries = orig_retries return (False, 'too many errors') if __name__ == "__main__": try: fek = FedoraEasyKarma() except KeyboardInterrupt: print "aborted" sys.exit(0)