#!/usr/bin/python
# vim: fileencoding=utf8 foldmethod=marker
# {{{ License header: GPLv2+
# This file is part of cnucnu.
#
# Cnucnu is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 2 of the License, or
# (at your option) any later version.
#
# Cnucnu is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with cnucnu. If not, see .
# }}}
""" :author: Till Maas
:contact: opensource@till.name
:license: GPLv2+
"""
__docformat__ = "restructuredtext"
# python default modules
import fnmatch
import re
# sre_constants contains re exceptions
import sre_constants
import string
import subprocess
import urllib
#extra modules
import pycurl
from fedora.client.pkgdb import PackageDB
# cnucnu modules
from cnucnu.bugzilla_reporter import BugzillaReporter
from cnucnu.config import global_config
import cnucnu.errors as cc_errors
from cnucnu import helper
from cnucnu.helper import cmp_upstream_repo, get_html, expand_subdirs, \
upstream_max
from cnucnu.scm import SCM
from cnucnu.wiki import MediaWiki
class Repository:
def __init__(self, name="", path=""):
if not (name and path):
c = global_config.config["repo"]
name = c["name"]
path = c["path"]
self.name = name
self.path = path
self.repoid = "cnucnu-%s" % "".join(
c for c in name if c in string.letters)
self.repofrompath = "%s,%s" % (self.repoid, self.path)
self._nvr_dict = None
@property
def nvr_dict(self):
if not self._nvr_dict:
self._nvr_dict = self.repoquery()
return self._nvr_dict
def repoquery(self, package_names=[]):
# TODO: get rid of repofrompath message even with --quiet
cmdline = ["/usr/bin/repoquery",
"--quiet",
"--archlist=src",
"--all",
"--repoid",
self.repoid,
"--qf",
"%{name}\t%{version}\t%{release}"]
if self.repofrompath:
cmdline.extend(['--repofrompath', self.repofrompath])
cmdline.extend(package_names)
repoquery = subprocess.Popen(cmdline, stdout=subprocess.PIPE)
(list_, stderr) = repoquery.communicate()
new_nvr_dict = {}
for line in list_.split("\n"):
if line != "":
name, version, release = line.split("\t")
new_nvr_dict[name] = (version, release)
return new_nvr_dict
def package_version(self, package):
try:
return self.nvr_dict[package.name][0]
except KeyError:
raise cc_errors.PackageNotFoundError(
"package '%s' not found in repository '%s' (%s)" % (
package.name, self.name, self.path))
def package_release(self, package):
return self.nvr_dict[package.name][1]
class Package(object):
def __init__(self, name, regex, url, repo=Repository(), scm=SCM(),
br=BugzillaReporter(), package_list=None):
# :TODO: add some sanity checks
self.name = name
self.raw_regex = None
self.raw_url = None
self.regex = regex
self.url = url
self._html = None
self._latest_upstream = None
self._upstream_versions = None
self._repo_version = None
self._repo_release = None
self._rpm_diff = None
self.repo = repo
self.repo_name = repo.name
self.scm = scm
self.br = br
self.package_list = package_list
def _invalidate_caches(self):
self._latest_upstream = None
self._upstream_versions = None
self._rpm_diff = None
def __str__(self):
return "%(name)s: repo=%(repo_version)s "\
"upstream=%(latest_upstream)s" % self
def __repr__(self):
return "%(name)s %(regex)s %(url)s" % self
def __getitem__(self, key):
return getattr(self, key)
def set_regex(self, regex):
self.raw_regex = regex
name = self.name
# allow name override with e.g. DEFAULT:othername
if regex:
name_override = re.match(r"^((?:FM-)?DEFAULT)(?::(.+))$", regex)
if name_override:
regex = name_override.group(1)
name = name_override.group(2)
# use DEFAULT regex but alter the name
if regex == "CPAN-DEFAULT":
# strip "perl-" prefix only if name was not overridden
if not name_override and name.startswith("perl-"):
name = name[len("perl-"):]
regex = "DEFAULT"
elif regex == "HACKAGE-DEFAULT":
# strip "ghc-" prefix only if name was not overridden
if not name_override and name.startswith("ghc-"):
name = name[len("ghc-"):]
regex = "DEFAULT"
elif regex == "PEAR-DEFAULT":
# strip "php-pear-" prefix only if name was not overridden
if not name_override and name.startswith("php-pear-"):
name = name[len("php-pear-"):].replace("-", "_")
regex = "DEFAULT"
elif regex == "PECL-DEFAULT":
# strip "php-pecl-" prefix only if name was not overridden
if not name_override and name.startswith("php-pecl-"):
name = name[len("php-pecl-"):].replace("-", "_")
regex = "DEFAULT"
elif regex == "RUBYGEMS-DEFAULT":
# strip "rubygem-" prefix only if name was not overridden
if not name_override and name.startswith("rubygem-"):
name = name[len("rubygem-"):]
elif regex == "NPM-DEFAULT":
# strip "nodejs-" prefix only if name was not overridden
if not name_override and name.startswith("nodejs-"):
name = name[len("nodejs-"):]
# no elif here, because the previous regex aliases are only for name
# altering
if regex == "DEFAULT":
regex = (
r"(?i)" # ignore case
r"\b%(name)s[-_]" # word-boundary, name and dash/underscore
r"(?:(?:src|source)[-_])?" # optional src or source string
r"([^-/_\s]*?" #
r"\d"
r"[^-/_\s]*?)"
r"(?:[-_.](?:src|source|orig))?"
r"\.(?:[jt]ar|t[bglx]z|tbz2|zip)\b" % {'name': re.escape(name)}
)
elif regex == "DRUPAL-DEFAULT":
branch = name[6]
regex = \
"(?s)Recommended releases.*?>{0}.x-([^<]*)".format(branch)
elif regex == "FM-DEFAULT":
regex = '([^<]*)'
elif regex == "DIR-LISTING-DEFAULT":
regex = 'href="([0-9][0-9.]*)/"'
elif regex == "RUBYGEMS-DEFAULT":
regex = \
'"gem_uri":"http:\/\/rubygems.org\/gems\/%s-([0-9.]*?)\.gem"' \
% re.escape(name)
elif regex == "NPM-DEFAULT":
regex = '"version":"([0-9.]*?)"'
self.__regex = regex
self._invalidate_caches()
regex = property(lambda self: self.__regex, set_regex)
def set_url(self, url):
self.raw_url = url
name = self.name
# allow name override with e.g. SF-DEFAULT:othername
if url:
name_override = re.match(
r"^((?:SF|FM|GNU|CPAN|DRUPAL|HACKAGE|DEBIAN|GOOGLE|PEAR|PECL|"
"PYPI|LP|GNOME|RUBYGEMS)-DEFAULT)(?::(.+))$", url)
if name_override:
url = name_override.group(1)
name = name_override.group(2)
name = urllib.quote(name, safe='')
if url == "SF-DEFAULT":
url = "http://sourceforge.net/api/file/index/project-name/%s/" \
"mtime/desc/limit/200/rss" % name
elif url == "FM-DEFAULT":
url = "http://freshmeat.net/projects/%s" % name
elif url == "GNU-DEFAULT":
url = "http://ftp.gnu.org/gnu/%s/" % name
elif url == "CPAN-DEFAULT":
# strip "perl-" prefix only if name was not overridden
if not name_override and name.startswith("perl-"):
name = name[len("perl-"):]
url = "http://search.cpan.org/dist/%s/" % name
elif url == "DRUPAL-DEFAULT":
if not name_override and (name.startswith("drupal7-") or
name.startswith("drupal6-")):
name = name[len("drupalX-"):]
url = "http://drupal.org/project/{0}".format(name)
elif url == "HACKAGE-DEFAULT":
# strip "ghc-" prefix only if name was not overridden
if not name_override and name.startswith("ghc-"):
name = name[len("ghc-"):]
url = "http://hackage.haskell.org/package/%s" % name
elif url == "DEBIAN-DEFAULT":
url = "http://ftp.debian.org/debian/pool/main/%s/%s/" % \
(name[0], name)
elif url == "GOOGLE-DEFAULT":
url = "http://code.google.com/p/%s/downloads/list" % name
elif url == "PYPI-DEFAULT":
url = "https://pypi.python.org/packages/source/%s/%s/" % \
(name[0], name)
elif url == "PEAR-DEFAULT":
# strip "php-pear-" prefix only if name was not overridden
if not name_override and name.startswith("php-pear-"):
name = name[len("php-pear-"):].replace("-", "_")
url = "http://pear.php.net/package/%s/download" % name
elif url == "PECL-DEFAULT":
# strip "php-pecl-" prefix only if name was not overridden
if not name_override and name.startswith("php-pecl-"):
name = name[len("php-pecl-"):].replace("-", "_")
url = "http://pecl.php.net/package/%s/download" % name
elif url == "LP-DEFAULT":
url = "https://launchpad.net/%s/+download" % name
elif url == "GNOME-DEFAULT":
url = "http://download.gnome.org/sources/%s/*/" % name
elif url == "RUBYGEMS-DEFAULT":
# strip "rubygem-" prefix only if name was not overridden
if not name_override and name.startswith("rubygem-"):
name = name[len("rubygem-"):]
url = "http://rubygems.org/api/v1/gems/%s.json" % name
elif url == "NPM-DEFAULT":
# strip "nodejs-" prefix only if name was not overridden
if not name_override and name.startswith("nodejs-"):
name = name[len("nodejs-"):]
url = "http://registry.npmjs.org/%s" % name
self.__url = url
self.html = None
url = property(lambda self: self.__url, set_url)
def set_html(self, html):
self._html = html
self._invalidate_caches()
def get_html(self):
if not self._html:
try:
self.__url = expand_subdirs(self.url)
html = get_html(self.url)
# TODO: get_html should raise a generic retrieval error
except IOError:
raise cc_errors.UpstreamVersionRetrievalError(
"%(name)s: IO error while retrieving upstream URL. - "
"%(url)s - %(regex)s" % self)
except pycurl.error, e:
raise cc_errors.UpstreamVersionRetrievalError(
"%(name)s: Pycurl while retrieving upstream URL. - "
"%(url)s - %(regex)s" % self + " " + str(e))
self._html = html
return self._html
html = property(get_html, set_html)
@property
def upstream_versions(self):
if not self._upstream_versions:
try:
upstream_versions = re.findall(self.regex, self.html)
except sre_constants.error:
raise cc_errors.UpstreamVersionRetrievalError(
"%s: invalid regular expression" % self.name)
for index, version in enumerate(upstream_versions):
if type(version) == tuple:
version = ".".join([v for v in version if not v == ""])
upstream_versions[index] = version
if " " in version:
raise cc_errors.UpstreamVersionRetrievalError(
"%s: invalid upstream version:>%s< - %s - %s " % (
self.name, version, self.url, self.regex))
if len(upstream_versions) == 0:
raise cc_errors.UpstreamVersionRetrievalError(
"%(name)s: no upstream version found. - %(url)s - "
"%(regex)s" % self)
self._upstream_versions = upstream_versions
# invalidate sub caches
self._latest_upstream = None
self._rpm_diff = None
return self._upstream_versions
@property
def latest_upstream(self):
if not self._latest_upstream:
self._latest_upstream = upstream_max(self.upstream_versions)
# invalidate _rpm_diff cache
self._rpm_diff = None
return self._latest_upstream
@property
def nagging(self):
return self.name not in self.package_list.ignore_packages
@property
def repo_version(self):
if not self._repo_version:
self._repo_version = self.repo.package_version(self)
return self._repo_version
@property
def repo_release(self):
if not self._repo_release:
self._repo_release = self.repo.package_release(self)
return self._repo_release
@property
def rpm_diff(self):
if not self._rpm_diff:
self._rpm_diff = cmp_upstream_repo(self.latest_upstream,
(self.repo_version,
self.repo_release))
return self._rpm_diff
@property
def upstream_newer(self):
return self.rpm_diff == 1
@property
def repo_newer(self):
return self.rpm_diff == -1
@property
def status(self):
if self.upstream_newer:
return "(outdated)"
elif self.repo_newer:
return "(%(repo_name)s newer)" % self
else:
return ""
@property
def upstream_version_in_scm(self):
return self.scm.has_upstream_version(self)
@property
def exact_outdated_bug(self):
return self.br.get_exact_outdated_bug(self)
@property
def open_outdated_bug(self):
return self.br.get_open_outdated_bug(self)
def report_outdated(self, dry_run=True):
if self.nagging:
if not self.upstream_newer:
print "Upstream not newer, report_outdated aborted!", str(self)
return None
if self.upstream_version_in_scm:
print "Upstream Version found in SCM, skipping bug report: "\
"%(name)s U:%(latest_upstream)s R:%(repo_version)s" % self
return None
return self.br.report_outdated(self, dry_run)
else:
print "Nagging disabled for package: %s" % str(self)
return None
class PackageList:
def __init__(self, repo=Repository(), scm=SCM(), br=BugzillaReporter(),
mediawiki=False, packages=None):
""" A list of packages to be checked.
:Parameters:
repo : `cnucnu.Repository`
Repository to compare with upstream
scm : `cnucnu.SCM`
SCM to compares sources files with upstream version
mediawiki : dict
Get a list of package names, urls and regexes from a mediawiki
page defined in the dict.
packages : [cnucnu.Package]
List of packages to populate the package_list with
"""
self.ignore_owners = []
self._ignore_packages = None
if not mediawiki:
mediawiki = global_config.config["package list"]["mediawiki"]
if not packages and mediawiki:
w = MediaWiki(base_url=mediawiki["base url"])
page_text = w.get_pagesource(mediawiki["page"])
ignore_owner_regex = re.compile('\\* ([^ ]*)')
self.ignore_owners = [
o[0].encode("UTF-8") for o in
helper.match_interval(page_text, ignore_owner_regex,
"== Package Owner Ignore List ==",
"")
]
packages = []
repo.package_list = self
package_line_regex = re.compile(
'^\s+\\*\s+(\S+)\s+(.+?)\s+(\S+)\s*$')
for package_data in helper.match_interval(
page_text, package_line_regex,
"== List Of Packages ==", ""):
(name, regex, url) = package_data
matched_names = fnmatch.filter(repo.nvr_dict.keys(), name)
if len(matched_names) == 0:
# Add non-matching name to trigger an error/warning later
# FIXME: Properly report bad names
matched_names = [name]
for name in matched_names:
packages.append(
Package(name, regex, url, repo, scm, br,
package_list=self))
self.packages = packages
self.append = self.packages.append
self.__len__ = self.packages.__len__
@property
def ignore_packages(self):
if self._ignore_packages is None:
pdb = PackageDB(retries=5)
ignore_packages = []
for owner in self.ignore_owners:
pkgs = pdb.user_packages(owner, acls="owner")["pkgs"]
p_names = [p["name"] for p in pkgs]
ignore_packages.extend(p_names)
ignore_packages = set(ignore_packages)
self._ignore_packages = ignore_packages
return self._ignore_packages
def __getitem__(self, key):
if isinstance(key, int):
return self.packages[key]
elif isinstance(key, str):
for p in self.packages:
if p.name == key:
return p
raise KeyError("Package %s not found" % key)
def get(self, key, default=None):
try:
return self.__getitem__(key)
except KeyError:
return default
if __name__ == '__main__':
pl = PackageList()
p = pl.packages[0]
print p.upstream_versions