summaryrefslogtreecommitdiffstats
path: root/yum-presto/presto.py
diff options
context:
space:
mode:
Diffstat (limited to 'yum-presto/presto.py')
-rw-r--r--yum-presto/presto.py464
1 files changed, 350 insertions, 114 deletions
diff --git a/yum-presto/presto.py b/yum-presto/presto.py
index 48db24d..255372c 100644
--- a/yum-presto/presto.py
+++ b/yum-presto/presto.py
@@ -18,36 +18,342 @@
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
# Copyright 2005 Duke University
# Copyright 2007 Jonathan Dieter
-
-from yum.plugins import TYPE_INTERACTIVE, PluginYumExit
-from yum import config
+# Copyright 2007 Red Hat, Inc. -- Jeremy Katz <katzj@redhat.com>
import os
import sys
+import subprocess
+import gzip
+import thread
+import threading
+import Queue
+try:
+ from cElementTree import iterparse
+except:
+ from xml.etree.cElementTree import iterparse
+
+from yum.plugins import TYPE_CORE, PluginYumExit
+import yum.Errors
+import yum.misc
+from urlgrabber.grabber import URLGrabError
-sys.path.append("/usr/share/presto")
-import deltarpm
-from prestoRepo import PrestoRepository
-from prestomdparser import PrestoMDParser
-import prestoTransaction
-import prestoLog
-import prestoDownload
requires_api_version = '2.1'
-LOG_FILE = "/var/log/presto.log"
-plugin_type = (TYPE_INTERACTIVE,)
+plugin_type = (TYPE_CORE,)
+
+# mapping of repo.id -> PrestoInfo
+pinfo = {}
+
+def verifyDelta(sequence, arch):
+ if subprocess.call(["/usr/bin/applydeltarpm", "-a", arch,
+ "-C", "-s", sequence]):
+ return False
+ return True
+
+def applyDelta(deltarpmfile, newrpmfile, arch):
+ if subprocess.call(["/usr/bin/applydeltarpm", "-a", arch,
+ deltarpmfile, newrpmfile]):
+ return False
+ return True
+
+def reconstruct(conduit, rpmlocal, rpmarch, deltalocal):
+ retlist = ""
+
+ if not applyDelta(deltalocal, rpmlocal, rpmarch):
+ retlist += "Error rebuilding rpm from %s! Will download full package.\n" % os.path.basename(deltalocal)
+ try:
+ os.unlink(rpmlocal)
+ except:
+ pass
+ else:
+ # Check to see whether or not we should keep the drpms
+ # FIXME: Is there any way to see whether or not a Boolean option was not set?
+ if conduit.confBool('main', 'neverkeepdeltas'):
+ delete = True
+ elif conduit.confBool('main', 'keepdeltas'):
+ delete = False
+ elif conduit.getConf().keepcache != 0:
+ delete = False
+ else:
+ delete = True
+
+ if delete:
+ try:
+ os.unlink(deltalocal)
+ except:
+ pass
+ return retlist
+
+class ReconstructionThread(threading.Thread):
+ def __init__(self, queue, lock, run_function):
+ threading.Thread.__init__(self)
+ self.run_function = run_function
+ self.queue = queue
+ self.lock = lock
+ self.can_exit = False
+ self.messages = ""
+
+ def run(self):
+ while True:
+ try:
+ retval = self.queue.get(not self.can_exit)
+ except Queue.Empty:
+ # If we're done with our drpms and no more are coming, let's
+ # blow this joint
+ break
+ if retval != None:
+ messages = apply(self.run_function, retval)
+ if self.can_exit:
+ # If there are not going to be any more new drpms,
+ # send messages directly to conduit
+ conduit = retval[0]
+ if self.messages != "":
+ conduit.info(2, self.messages[:-1])
+ self.messages = ""
+ if messages != "":
+ conduit.info(2, messages[:-1])
+ else:
+ # We may be downloading drpms still, so queue messages
+ self.lock.acquire()
+ self.messages += messages
+ self.lock.release()
+
+
+def getDelta(po, presto, rpmdb):
+ """Does the package have a reasonable delta for us to use?"""
+
+ # local packages don't make sense to use a delta for...
+ if hasattr(po, 'pkgtype') and po.pkgtype == 'local':
+ return None
+ if po.remote_url.startswith("file:/"):
+ # kind of a hack, but file:/ repos are basically local
+ return None
+
+ # if there's not presto info for the repo, we don't have a delta for
+ # the package
+ if not presto.has_key(po.repo.id):
+ return None
+ deltainfo = presto[po.repo.id]
+
+ # any deltas for the new package in the repo?
+ nevra = "%s-%s:%s-%s.%s" %(po.name, po.epoch, po.version,
+ po.release, po.arch)
+ if not deltainfo.has_key(nevra):
+ return None
+ deltas = deltainfo[nevra]
+
+ # check to see if we've already got the full package
+ local = po.localPkg()
+ if os.path.exists(local):
+ cursize = os.stat(local)[6]
+ totsize = long(po.size)
+ if po.verifyLocalPkg(): # we've got it.
+ return None
+ if cursize < totsize: # we have part of the file; do a reget
+ return None
+ os.unlink(local)
+
+ # did we have a previous package of the same arch installed?
+ installed = rpmdb.searchNevra(po.name, None, None, None, po.arch)
+ if len(installed) == 0:
+ return None
+
+ # now, let's see if there's a delta for us...
+ bestdelta = None
+
+ for oldpo in installed:
+ evr = "%s:%s-%s" %(oldpo.epoch, oldpo.version, oldpo.release)
+ if not deltas.has_key(evr):
+ continue
+ delta = deltas[evr]
+
+ # we just want to use the smallest delta
+ if bestdelta and delta['size'] >= bestdelta['size']:
+ continue
+
+ if not verifyDelta(delta['sequence'], po.arch):
+ continue
+
+ bestdelta = delta
+
+ return bestdelta
+
+
+def downloadPkgs(conduit, presto):
+ """download list of package objects handed to you, return errors"""
+
+ errors = {}
+ def adderror(po, msg):
+ errors.setdefault(po, []).append(msg)
+
+ # Set up thread for applying drpms
+ queue = Queue.Queue(0)
+ lock = thread.allocate_lock()
+ curthread = ReconstructionThread(queue, lock, reconstruct)
+ curthread.start()
+
+ remote_pkgs = []
+
+ # see which deltas we need to download; if the delta is already
+ # downloaded, we can start it reconstructing in the background
+ for po in conduit.getDownloadPackages():
+ delta = getDelta(po, presto, conduit.getRpmDB())
+ if delta is None:
+ continue
+
+ # verify the delta if it already exists
+ deltadir = os.path.join(po.repo.cachedir, 'deltas')
+ if not os.path.isdir(deltadir):
+ try:
+ os.mkdir(deltadir)
+ except OSError:
+ continue
+ deltapath = os.path.join(deltadir,
+ os.path.basename(delta['filename']))
+ if os.path.exists(deltapath):
+ try:
+ conduit._base.verifyChecksum(deltapath, delta['checksum_type'],
+ delta['checksum'])
+ except URLGrabError, e:
+ if po.repo.cache:
+ raise Errors.RepoError, "Caching enabled and local cache for %s doesn't match checksum" %(deltapath,)
+ else:
+ cursize = os.stat(deltapath)[6]
+ totsize = long(delta['size'])
+ if cursize >= totsize:
+ os.unlink(deltapath)
+
+ remote_packages.append( (po, delta) )
+ else:
+ # Deltarpm is local and good, put it in the rebuild thread.
+ conduit.info(5, "using local copy of deltarpm for %s" % po)
+ queue.put((conduit, po.localpath, po.arch, deltapath))
+ continue
+ else:
+ remote_pkgs.append( (po, delta) )
+
+ # now we need to do downloads
+ i = 0
+ for (po, delta) in remote_pkgs:
+ i += 1
+ # FIXME: verifyChecksum should handle the urlgrabber objects...
+ checkfunc = (lambda fo, csumtype, csum:
+ conduit._base.verifyChecksum(fo.filename, csumtype, csum),
+ (delta['checksum_type'],
+ delta['checksum']), {})
+
+ deltadir = os.path.join(po.repo.cachedir, 'deltas')
+ deltapath = os.path.join(deltadir,
+ os.path.basename(delta['filename']))
+
+ # FIXME: this should be moved into _getFile
+ dirstat = os.statvfs(deltadir)
+ if (dirstat.f_bavail * dirstat.f_bsize) <= long(po.size):
+ adderror(po, 'Insufficient space in download directory %s '
+ 'to download' % (deltadir,))
+ continue
+ try:
+ text = "(%s/%s): %s" %(i, len(remote_pkgs),
+ os.path.basename(delta['filename']))
+ deltafile = po.repo._getFile(url=po.basepath,
+ relative=delta['filename'],
+ local=deltapath,
+ checkfunc=checkfunc,
+ text=text,
+ cache=po.repo.cache)
+ except Errors.RepoError, e:
+ adderror(po, str(e))
+ else:
+ queue.put((conduit, po.localpath, po.arch, deltafile))
+
+ if errors.has_key(po):
+ del errors[po]
+
+ # Check for waiting messages from building thread
+ lock.acquire()
+ if curthread.messages != "":
+ conduit.info(2, curthread.messages[:-1])
+ curthread.messages = ""
+ lock.release()
+
+ conduit.info(2, "Rebuilding rpms from deltarpms")
+
+ # Tell build thread that there are no more drpms and wait for it to exit
+ curthread.can_exit = True
+ queue.put(None)
+ curthread.join()
+
+ if curthread.messages != "":
+ conduit.info(2, curthread.messages[:-1])
+ curthread.messages = ""
+
+ return errors
+
+class DeltaInfo(object):
+ def __init__(self, elem):
+ self.epoch = elem.get("oldepoch")
+ self.version = elem.get("oldversion")
+ self.release = elem.get("oldrelease")
+
+ self.filename = self.sequence = self.size = self.checksum = self.checksum_type = None
+
+ for x in elem.getchildren():
+ if x.tag == "checksum":
+ self.checksum_type = x.get("type")
+ setattr(self, x.tag, x.text)
+
+ def evr(self):
+ return "%s:%s-%s" %(self.epoch, self.version, self.release)
+
+ def __str__(self):
+ return "filename: %s, sequence: %s, size: %s, checksum (%s) = %s" % (self.filename, self.sequence, self.size, self.checksum_type, self.checksum)
+
+ def __getitem__(self, key):
+ return getattr(self, key)
+
+class NewPackage(object):
+ def __init__(self, elem):
+ for prop in ("name", "version", "release", "epoch", "arch"):
+ setattr(self, prop, elem.get(prop))
+
+ self.deltas = {}
+ for child in elem.getchildren():
+ if child.tag != "delta":
+ continue
+ d = DeltaInfo(child)
+ self.deltas[d.evr()] = d
+
+ def nevra(self):
+ return "%s-%s:%s-%s.%s" %(self.name, self.epoch, self.version,
+ self.release, self.arch)
+
+ def __str__(self):
+ return "%s <== %s" % (self.nevra(), self.deltas)
+
+ def has_key(self, key):
+ return self.deltas.has_key(key)
+ def __getitem__(self, key):
+ return self.deltas[key]
+
+class PrestoParser(object):
+ def __init__(self, filename):
+ self.deltainfo = {}
+
+ if filename.endswith(".gz"):
+ fo = gzip.open(filename)
+ else:
+ fo = open(filename, 'rt')
+ for event, elem in iterparse(fo):
+ if elem.tag == "newpackage":
+ p = NewPackage(elem)
+ self.deltainfo[p.nevra()] = p
+
+ def getDeltas(self):
+ return self.deltainfo
-rpm_size = 0
-drpm_size = 0
-drpm_count = 0
-log = None
# Configuration stuff
def config_hook(conduit):
- # Set up repository specific deltarpm url and mirrorlist
- config.RepoConf.deltaurl = config.UrlListOption()
- config.RepoConf.deltamirrorlist = config.UrlOption()
-
# Add --disable-presto option
parser = conduit.getOptParser()
parser.add_option('', '--disablepresto', dest='disablepresto',
@@ -58,106 +364,36 @@ def config_hook(conduit):
def postreposetup_hook(conduit):
opts, commands = conduit.getCmdLine()
if not opts.disablepresto:
- conduit.info(2, 'Setting up Presto')
+ conduit.info(2, 'Setting up and reading Presto delta metadata')
for active_repo in conduit.getRepos().listEnabled():
- p_repo = PrestoRepository(active_repo, conduit)
- p_repo.setup(conduit.getConf().cache)
-
- conduit.info(2, 'Reading Presto metadata in from local files')
- for active_repo in conduit.getRepos().listEnabled():
- xml = active_repo.p_repo.getPrestoXML()
- if active_repo.p_repo.enabled:
- xmldata = active_repo.p_repo.repoXML.getData('deltas')
- (ctype, csum) = xmldata.checksum
- parser = PrestoMDParser(xml)
- active_repo.p_repo.deltalist = parser.getDeltaList()
+ try:
+ deltamd = active_repo.retrieveMD("prestodelta")
+ except yum.Errors.RepoMDError:
+ conduit.info(2, "No Presto metadata available for %s" %(active_repo,))
+ continue
+ pinfo[active_repo.id] = PrestoParser(deltamd).getDeltas()
else:
conduit.info(5, '--disablepresto specified - Presto disabled')
-
-def postresolve_hook(conduit):
- global rpm_size
- global drpm_size
- global drpm_count
-
+
+def predownload_hook(conduit):
opts, commands = conduit.getCmdLine()
- if not opts.disablepresto:
- # Cycle through packages to see if there's a deltarpm available
- for newpkg in conduit.getTsInfo():
- if newpkg.ts_state != "e":
- (chosen_drpm, installed, local, drpm_enabled) = prestoTransaction.find_available_drpms(conduit, newpkg)
-
- # If a drpm was found, change certain package information so it reflects
- # the drpm, not the rpm.
- if chosen_drpm != None:
- newpkg.po.has_drpm = True
- conduit.info(2, "Found deltarpm update for %s.%s %s:%s-%s" % (newpkg.name, newpkg.arch, newpkg.epoch, newpkg.version, newpkg.release))
- # In yum 3.0.x, this doesn't get defined if you run "yum update x" rather than "yum update"
- rpm_size += int(newpkg.po.size)
- drpm_size += int(chosen_drpm['size'])
- newpkg.po.realpackagesize = newpkg.po.size
- if hasattr(newpkg.po, 'packagesize'):
- newpkg.po.packagesize = chosen_drpm['size']
- else:
- newpkg.po.simple['packagesize'] = chosen_drpm['size']
- newpkg.po.deltasize = chosen_drpm['size']
- newpkg.po.deltarelativepath = chosen_drpm['drpm_filename']
- newpkg.po.deltachecksumtype = chosen_drpm['checksum_type']
- newpkg.po.deltachecksum = chosen_drpm['checksum']
- newpkg.po.deltalocalpath = newpkg.po.repo.deltasdir + "/" + os.path.basename(chosen_drpm['drpm_filename'])
- newpkg.po.to = newpkg
- newpkg.po.hasdrpm = True
- newpkg.repoid = newpkg.po.repo.id + " *"
- drpm_count += 1
- else:
- if installed and drpm_enabled and not local:
- try:
- rpm_size += int(newpkg.po.size)
- drpm_size += int(newpkg.po.size)
- except:
- pass
+ if opts.disablepresto or len(conduit.getDownloadPackages()) == 0:
return
-
-def predownload_hook(conduit):
- global drpm_count
- global log
-
- # Set up logging
- log = prestoLog.PrestoLog(conduit, LOG_FILE)
+ conduit.info(2, "Downloading DeltaRPMs:")
- pkglist = conduit.getDownloadPackages()
-
- opts, commands = conduit.getCmdLine()
- if not opts.disablepresto and drpm_count > 0:
- conduit.info(2, "Downloading DeltaRPMs:")
-
- # Download deltarpms
- problems = prestoDownload.downloadPkgs(conduit, pkglist, log)
-
- # If 'exitondownloaderror' is on, exit
- if conduit.confBool('main', 'exitondownloaderror') and len(problems.keys()) > 0:
- errstring = 'Error Downloading Packages:\n'
- for key in problems.keys():
- errors = misc.unique(problems[key])
- for error in errors:
- errstring += ' %s: %s\n' % (key, error)
- raise PluginYumExit(errstring)
-
- else:
- conduit.info(2, "Downloading RPMs:")
+ # Download deltarpms
+ problems = downloadPkgs(conduit, pinfo)
-def posttrans_hook(conduit):
- global rpm_size
- global drpm_size
- global log
-
- log.close()
-
- if rpm_size > 0:
- drpm_string = prestoTransaction.format_number(drpm_size)
- rpm_string = prestoTransaction.format_number(rpm_size)
-
- conduit.info(2, "Size of all updates downloaded from Presto-enabled repositories: %s" % drpm_string)
- conduit.info(2, "Size of updates that would have been downloaded if Presto wasn't enabled: %s" % rpm_string)
- conduit.info(2, "This is a savings of %i percent" % (100 - ((drpm_size * 100) / rpm_size)))
+ # If 'exitondownloaderror' is on, exit
+ if conduit.confBool('main', 'exitondownloaderror') and \
+ len(problems.keys()) > 0:
+ errstring = 'Error Downloading DeltaRPMs:\n'
+ for key in problems.keys():
+ errors = yum.misc.unique(problems[key])
+ for error in errors:
+ errstring += ' %s: %s\n' % (key, error)
+ raise PluginYumExit(errstring)
+
+# FIXME: would be good to give an idea to people of what they saved