diff options
author | Jonathan Dieter <jdieter@gmail.com> | 2007-07-11 18:37:03 +0300 |
---|---|---|
committer | Jonathan Dieter <jdieter@gmail.com> | 2007-07-11 18:37:03 +0300 |
commit | ec53c5c2b2f6ea31b22c47e4ae8f71c4eff2cd37 (patch) | |
tree | 8f6deaef1c82c121fc71d5b7b10fa2b0ac0fa853 /yum-presto/presto.py | |
parent | dce0600bc64c793ba6e8f67c56c286d8d97e7c4c (diff) | |
download | presto-ec53c5c2b2f6ea31b22c47e4ae8f71c4eff2cd37.tar.gz presto-ec53c5c2b2f6ea31b22c47e4ae8f71c4eff2cd37.tar.xz presto-ec53c5c2b2f6ea31b22c47e4ae8f71c4eff2cd37.zip |
Complete rewrite of yum-presto. New xml format.
Signed-off-by: Jonathan Dieter <jdieter@gmail.com>
Diffstat (limited to 'yum-presto/presto.py')
-rw-r--r-- | yum-presto/presto.py | 464 |
1 files changed, 350 insertions, 114 deletions
diff --git a/yum-presto/presto.py b/yum-presto/presto.py index 48db24d..255372c 100644 --- a/yum-presto/presto.py +++ b/yum-presto/presto.py @@ -18,36 +18,342 @@ # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # Copyright 2005 Duke University # Copyright 2007 Jonathan Dieter - -from yum.plugins import TYPE_INTERACTIVE, PluginYumExit -from yum import config +# Copyright 2007 Red Hat, Inc. -- Jeremy Katz <katzj@redhat.com> import os import sys +import subprocess +import gzip +import thread +import threading +import Queue +try: + from cElementTree import iterparse +except: + from xml.etree.cElementTree import iterparse + +from yum.plugins import TYPE_CORE, PluginYumExit +import yum.Errors +import yum.misc +from urlgrabber.grabber import URLGrabError -sys.path.append("/usr/share/presto") -import deltarpm -from prestoRepo import PrestoRepository -from prestomdparser import PrestoMDParser -import prestoTransaction -import prestoLog -import prestoDownload requires_api_version = '2.1' -LOG_FILE = "/var/log/presto.log" -plugin_type = (TYPE_INTERACTIVE,) +plugin_type = (TYPE_CORE,) + +# mapping of repo.id -> PrestoInfo +pinfo = {} + +def verifyDelta(sequence, arch): + if subprocess.call(["/usr/bin/applydeltarpm", "-a", arch, + "-C", "-s", sequence]): + return False + return True + +def applyDelta(deltarpmfile, newrpmfile, arch): + if subprocess.call(["/usr/bin/applydeltarpm", "-a", arch, + deltarpmfile, newrpmfile]): + return False + return True + +def reconstruct(conduit, rpmlocal, rpmarch, deltalocal): + retlist = "" + + if not applyDelta(deltalocal, rpmlocal, rpmarch): + retlist += "Error rebuilding rpm from %s! Will download full package.\n" % os.path.basename(deltalocal) + try: + os.unlink(rpmlocal) + except: + pass + else: + # Check to see whether or not we should keep the drpms + # FIXME: Is there any way to see whether or not a Boolean option was not set? + if conduit.confBool('main', 'neverkeepdeltas'): + delete = True + elif conduit.confBool('main', 'keepdeltas'): + delete = False + elif conduit.getConf().keepcache != 0: + delete = False + else: + delete = True + + if delete: + try: + os.unlink(deltalocal) + except: + pass + return retlist + +class ReconstructionThread(threading.Thread): + def __init__(self, queue, lock, run_function): + threading.Thread.__init__(self) + self.run_function = run_function + self.queue = queue + self.lock = lock + self.can_exit = False + self.messages = "" + + def run(self): + while True: + try: + retval = self.queue.get(not self.can_exit) + except Queue.Empty: + # If we're done with our drpms and no more are coming, let's + # blow this joint + break + if retval != None: + messages = apply(self.run_function, retval) + if self.can_exit: + # If there are not going to be any more new drpms, + # send messages directly to conduit + conduit = retval[0] + if self.messages != "": + conduit.info(2, self.messages[:-1]) + self.messages = "" + if messages != "": + conduit.info(2, messages[:-1]) + else: + # We may be downloading drpms still, so queue messages + self.lock.acquire() + self.messages += messages + self.lock.release() + + +def getDelta(po, presto, rpmdb): + """Does the package have a reasonable delta for us to use?""" + + # local packages don't make sense to use a delta for... + if hasattr(po, 'pkgtype') and po.pkgtype == 'local': + return None + if po.remote_url.startswith("file:/"): + # kind of a hack, but file:/ repos are basically local + return None + + # if there's not presto info for the repo, we don't have a delta for + # the package + if not presto.has_key(po.repo.id): + return None + deltainfo = presto[po.repo.id] + + # any deltas for the new package in the repo? + nevra = "%s-%s:%s-%s.%s" %(po.name, po.epoch, po.version, + po.release, po.arch) + if not deltainfo.has_key(nevra): + return None + deltas = deltainfo[nevra] + + # check to see if we've already got the full package + local = po.localPkg() + if os.path.exists(local): + cursize = os.stat(local)[6] + totsize = long(po.size) + if po.verifyLocalPkg(): # we've got it. + return None + if cursize < totsize: # we have part of the file; do a reget + return None + os.unlink(local) + + # did we have a previous package of the same arch installed? + installed = rpmdb.searchNevra(po.name, None, None, None, po.arch) + if len(installed) == 0: + return None + + # now, let's see if there's a delta for us... + bestdelta = None + + for oldpo in installed: + evr = "%s:%s-%s" %(oldpo.epoch, oldpo.version, oldpo.release) + if not deltas.has_key(evr): + continue + delta = deltas[evr] + + # we just want to use the smallest delta + if bestdelta and delta['size'] >= bestdelta['size']: + continue + + if not verifyDelta(delta['sequence'], po.arch): + continue + + bestdelta = delta + + return bestdelta + + +def downloadPkgs(conduit, presto): + """download list of package objects handed to you, return errors""" + + errors = {} + def adderror(po, msg): + errors.setdefault(po, []).append(msg) + + # Set up thread for applying drpms + queue = Queue.Queue(0) + lock = thread.allocate_lock() + curthread = ReconstructionThread(queue, lock, reconstruct) + curthread.start() + + remote_pkgs = [] + + # see which deltas we need to download; if the delta is already + # downloaded, we can start it reconstructing in the background + for po in conduit.getDownloadPackages(): + delta = getDelta(po, presto, conduit.getRpmDB()) + if delta is None: + continue + + # verify the delta if it already exists + deltadir = os.path.join(po.repo.cachedir, 'deltas') + if not os.path.isdir(deltadir): + try: + os.mkdir(deltadir) + except OSError: + continue + deltapath = os.path.join(deltadir, + os.path.basename(delta['filename'])) + if os.path.exists(deltapath): + try: + conduit._base.verifyChecksum(deltapath, delta['checksum_type'], + delta['checksum']) + except URLGrabError, e: + if po.repo.cache: + raise Errors.RepoError, "Caching enabled and local cache for %s doesn't match checksum" %(deltapath,) + else: + cursize = os.stat(deltapath)[6] + totsize = long(delta['size']) + if cursize >= totsize: + os.unlink(deltapath) + + remote_packages.append( (po, delta) ) + else: + # Deltarpm is local and good, put it in the rebuild thread. + conduit.info(5, "using local copy of deltarpm for %s" % po) + queue.put((conduit, po.localpath, po.arch, deltapath)) + continue + else: + remote_pkgs.append( (po, delta) ) + + # now we need to do downloads + i = 0 + for (po, delta) in remote_pkgs: + i += 1 + # FIXME: verifyChecksum should handle the urlgrabber objects... + checkfunc = (lambda fo, csumtype, csum: + conduit._base.verifyChecksum(fo.filename, csumtype, csum), + (delta['checksum_type'], + delta['checksum']), {}) + + deltadir = os.path.join(po.repo.cachedir, 'deltas') + deltapath = os.path.join(deltadir, + os.path.basename(delta['filename'])) + + # FIXME: this should be moved into _getFile + dirstat = os.statvfs(deltadir) + if (dirstat.f_bavail * dirstat.f_bsize) <= long(po.size): + adderror(po, 'Insufficient space in download directory %s ' + 'to download' % (deltadir,)) + continue + try: + text = "(%s/%s): %s" %(i, len(remote_pkgs), + os.path.basename(delta['filename'])) + deltafile = po.repo._getFile(url=po.basepath, + relative=delta['filename'], + local=deltapath, + checkfunc=checkfunc, + text=text, + cache=po.repo.cache) + except Errors.RepoError, e: + adderror(po, str(e)) + else: + queue.put((conduit, po.localpath, po.arch, deltafile)) + + if errors.has_key(po): + del errors[po] + + # Check for waiting messages from building thread + lock.acquire() + if curthread.messages != "": + conduit.info(2, curthread.messages[:-1]) + curthread.messages = "" + lock.release() + + conduit.info(2, "Rebuilding rpms from deltarpms") + + # Tell build thread that there are no more drpms and wait for it to exit + curthread.can_exit = True + queue.put(None) + curthread.join() + + if curthread.messages != "": + conduit.info(2, curthread.messages[:-1]) + curthread.messages = "" + + return errors + +class DeltaInfo(object): + def __init__(self, elem): + self.epoch = elem.get("oldepoch") + self.version = elem.get("oldversion") + self.release = elem.get("oldrelease") + + self.filename = self.sequence = self.size = self.checksum = self.checksum_type = None + + for x in elem.getchildren(): + if x.tag == "checksum": + self.checksum_type = x.get("type") + setattr(self, x.tag, x.text) + + def evr(self): + return "%s:%s-%s" %(self.epoch, self.version, self.release) + + def __str__(self): + return "filename: %s, sequence: %s, size: %s, checksum (%s) = %s" % (self.filename, self.sequence, self.size, self.checksum_type, self.checksum) + + def __getitem__(self, key): + return getattr(self, key) + +class NewPackage(object): + def __init__(self, elem): + for prop in ("name", "version", "release", "epoch", "arch"): + setattr(self, prop, elem.get(prop)) + + self.deltas = {} + for child in elem.getchildren(): + if child.tag != "delta": + continue + d = DeltaInfo(child) + self.deltas[d.evr()] = d + + def nevra(self): + return "%s-%s:%s-%s.%s" %(self.name, self.epoch, self.version, + self.release, self.arch) + + def __str__(self): + return "%s <== %s" % (self.nevra(), self.deltas) + + def has_key(self, key): + return self.deltas.has_key(key) + def __getitem__(self, key): + return self.deltas[key] + +class PrestoParser(object): + def __init__(self, filename): + self.deltainfo = {} + + if filename.endswith(".gz"): + fo = gzip.open(filename) + else: + fo = open(filename, 'rt') + for event, elem in iterparse(fo): + if elem.tag == "newpackage": + p = NewPackage(elem) + self.deltainfo[p.nevra()] = p + + def getDeltas(self): + return self.deltainfo -rpm_size = 0 -drpm_size = 0 -drpm_count = 0 -log = None # Configuration stuff def config_hook(conduit): - # Set up repository specific deltarpm url and mirrorlist - config.RepoConf.deltaurl = config.UrlListOption() - config.RepoConf.deltamirrorlist = config.UrlOption() - # Add --disable-presto option parser = conduit.getOptParser() parser.add_option('', '--disablepresto', dest='disablepresto', @@ -58,106 +364,36 @@ def config_hook(conduit): def postreposetup_hook(conduit): opts, commands = conduit.getCmdLine() if not opts.disablepresto: - conduit.info(2, 'Setting up Presto') + conduit.info(2, 'Setting up and reading Presto delta metadata') for active_repo in conduit.getRepos().listEnabled(): - p_repo = PrestoRepository(active_repo, conduit) - p_repo.setup(conduit.getConf().cache) - - conduit.info(2, 'Reading Presto metadata in from local files') - for active_repo in conduit.getRepos().listEnabled(): - xml = active_repo.p_repo.getPrestoXML() - if active_repo.p_repo.enabled: - xmldata = active_repo.p_repo.repoXML.getData('deltas') - (ctype, csum) = xmldata.checksum - parser = PrestoMDParser(xml) - active_repo.p_repo.deltalist = parser.getDeltaList() + try: + deltamd = active_repo.retrieveMD("prestodelta") + except yum.Errors.RepoMDError: + conduit.info(2, "No Presto metadata available for %s" %(active_repo,)) + continue + pinfo[active_repo.id] = PrestoParser(deltamd).getDeltas() else: conduit.info(5, '--disablepresto specified - Presto disabled') - -def postresolve_hook(conduit): - global rpm_size - global drpm_size - global drpm_count - + +def predownload_hook(conduit): opts, commands = conduit.getCmdLine() - if not opts.disablepresto: - # Cycle through packages to see if there's a deltarpm available - for newpkg in conduit.getTsInfo(): - if newpkg.ts_state != "e": - (chosen_drpm, installed, local, drpm_enabled) = prestoTransaction.find_available_drpms(conduit, newpkg) - - # If a drpm was found, change certain package information so it reflects - # the drpm, not the rpm. - if chosen_drpm != None: - newpkg.po.has_drpm = True - conduit.info(2, "Found deltarpm update for %s.%s %s:%s-%s" % (newpkg.name, newpkg.arch, newpkg.epoch, newpkg.version, newpkg.release)) - # In yum 3.0.x, this doesn't get defined if you run "yum update x" rather than "yum update" - rpm_size += int(newpkg.po.size) - drpm_size += int(chosen_drpm['size']) - newpkg.po.realpackagesize = newpkg.po.size - if hasattr(newpkg.po, 'packagesize'): - newpkg.po.packagesize = chosen_drpm['size'] - else: - newpkg.po.simple['packagesize'] = chosen_drpm['size'] - newpkg.po.deltasize = chosen_drpm['size'] - newpkg.po.deltarelativepath = chosen_drpm['drpm_filename'] - newpkg.po.deltachecksumtype = chosen_drpm['checksum_type'] - newpkg.po.deltachecksum = chosen_drpm['checksum'] - newpkg.po.deltalocalpath = newpkg.po.repo.deltasdir + "/" + os.path.basename(chosen_drpm['drpm_filename']) - newpkg.po.to = newpkg - newpkg.po.hasdrpm = True - newpkg.repoid = newpkg.po.repo.id + " *" - drpm_count += 1 - else: - if installed and drpm_enabled and not local: - try: - rpm_size += int(newpkg.po.size) - drpm_size += int(newpkg.po.size) - except: - pass + if opts.disablepresto or len(conduit.getDownloadPackages()) == 0: return - -def predownload_hook(conduit): - global drpm_count - global log - - # Set up logging - log = prestoLog.PrestoLog(conduit, LOG_FILE) + conduit.info(2, "Downloading DeltaRPMs:") - pkglist = conduit.getDownloadPackages() - - opts, commands = conduit.getCmdLine() - if not opts.disablepresto and drpm_count > 0: - conduit.info(2, "Downloading DeltaRPMs:") - - # Download deltarpms - problems = prestoDownload.downloadPkgs(conduit, pkglist, log) - - # If 'exitondownloaderror' is on, exit - if conduit.confBool('main', 'exitondownloaderror') and len(problems.keys()) > 0: - errstring = 'Error Downloading Packages:\n' - for key in problems.keys(): - errors = misc.unique(problems[key]) - for error in errors: - errstring += ' %s: %s\n' % (key, error) - raise PluginYumExit(errstring) - - else: - conduit.info(2, "Downloading RPMs:") + # Download deltarpms + problems = downloadPkgs(conduit, pinfo) -def posttrans_hook(conduit): - global rpm_size - global drpm_size - global log - - log.close() - - if rpm_size > 0: - drpm_string = prestoTransaction.format_number(drpm_size) - rpm_string = prestoTransaction.format_number(rpm_size) - - conduit.info(2, "Size of all updates downloaded from Presto-enabled repositories: %s" % drpm_string) - conduit.info(2, "Size of updates that would have been downloaded if Presto wasn't enabled: %s" % rpm_string) - conduit.info(2, "This is a savings of %i percent" % (100 - ((drpm_size * 100) / rpm_size))) + # If 'exitondownloaderror' is on, exit + if conduit.confBool('main', 'exitondownloaderror') and \ + len(problems.keys()) > 0: + errstring = 'Error Downloading DeltaRPMs:\n' + for key in problems.keys(): + errors = yum.misc.unique(problems[key]) + for error in errors: + errstring += ' %s: %s\n' % (key, error) + raise PluginYumExit(errstring) + +# FIXME: would be good to give an idea to people of what they saved |