From bdfb4037667c6af7d0537f9bc36b33d55d86e059 Mon Sep 17 00:00:00 2001 From: Jonathan Dieter Date: Tue, 3 Apr 2007 16:26:09 +0300 Subject: Add Ahmed's patch to rebuild rpms in separate thread. Complete rebuild of createprestorepo Signed-off-by: Jonathan Dieter --- createprestorepo/Makefile | 6 +- createprestorepo/createprestorepo | 2 + createprestorepo/createprestorepo.py | 308 ---------------------------------- createprestorepo/dumpMetadata.py | 105 ++++++++---- createprestorepo/genprestometadata.py | 37 +--- yum-presto/ChangeLog | 4 + yum-presto/Makefile | 1 + yum-presto/shared/prestoDownload.py | 107 +++++++----- yum-presto/shared/prestoThread.py | 52 ++++++ 9 files changed, 206 insertions(+), 416 deletions(-) create mode 100755 createprestorepo/createprestorepo delete mode 100755 createprestorepo/createprestorepo.py mode change 100644 => 100755 createprestorepo/genprestometadata.py create mode 100644 yum-presto/shared/prestoThread.py diff --git a/createprestorepo/Makefile b/createprestorepo/Makefile index 38fbfc6..c950686 100644 --- a/createprestorepo/Makefile +++ b/createprestorepo/Makefile @@ -2,5 +2,7 @@ clean: rm -f *.pyc *.pyo *~ install: -# mkdir -p $(DESTDIR)/usr/share/createprestorepo -# install -m 644 presto.py $(DESTDIR)/usr/lib/yum-plugins + install -m 755 createprestorepo $(DESTDIR)/usr/bin/createprestorepo + mkdir -p $(DESTDIR)/usr/share/createprestorepo + install -m 755 genprestometadata.py $(DESTDIR)/usr/share/createprestorepo + install -m 755 dumpMetadata.py $(DESTDIR)/usr/share/createprestorepo diff --git a/createprestorepo/createprestorepo b/createprestorepo/createprestorepo new file mode 100755 index 0000000..5334a80 --- /dev/null +++ b/createprestorepo/createprestorepo @@ -0,0 +1,2 @@ +#!/bin/sh +exec /usr/share/createprestorepo/genprestometadata.py "$@" diff --git a/createprestorepo/createprestorepo.py b/createprestorepo/createprestorepo.py deleted file mode 100755 index a217045..0000000 --- a/createprestorepo/createprestorepo.py +++ /dev/null @@ -1,308 +0,0 @@ -#!/usr/bin/python -t -# -*- mode: Python; indent-tabs-mode: nil; -*- -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -import errno, os, sys, gzip -import fnmatch, re -import rpmUtils.transaction, rpmUtils.miscutils -import commands, libxml2 -import dumpMetadata -from dumpMetadata import _gzipOpen, getChecksum -#### import Utils - -DEBUG = True -#### Utils.setdebug(DEBUG) - -SUFFIX='drpm' -DRPMWORTHKEEPINGTHRESH=0.5 -DEBUG=0 -REPODATA="repodata" -REPOFILE="presto.xml" -REPOMDFILE="prestomd.xml" -SUM_TYPE="sha" - -def XML_start_newrpm(node, (f, n, e, v, r, a), srcdir_len): - newrpm_node = node.newChild(None, "package", None) - newrpm_node.newProp("type", "rpm") - newrpm_node.newChild(None, "name", n) - newrpm_node.newChild(None, "arch", str(a)) - version = newrpm_node.newChild(None, "version", None) - version.newProp("epoch", str(e)) - version.newProp("ver", str(v)) - version.newProp("rel", str(r)) - deltas = newrpm_node.newChild(None, "deltas", None) - return deltas - -def XML_oldrpm(newrpm_node, drpm_file, oldrpm, newrpm, sequence, size): - (f, n, e, v, r, a) = oldrpm - (nf, nn, ne, nv, nr, na) = newrpm - oldrpm_node = newrpm_node.newChild(None, "oldrpm", None) - checksum = getChecksum(SUM_TYPE, drpm_file) - if n != nn: - oldrpm_node.newChild(None, "name", n) - if a != na: - oldrpm_node.newChild(None, "arch", str(a)) - version = oldrpm_node.newChild(None, "version", None) - if e != ne: - version.newProp("epoch", str(e)) - if v != nv: - version.newProp("ver", str(v)) - version.newProp("rel", str(r)) - oldrpm_node.newChild(None, "drpm_filename", drpm_file) - oldrpm_node.newChild(None, "size", str(size)) - oldrpm_node.newChild(None, "sequence", str(sequence)) - cs_node = oldrpm_node.newChild(None, "checksum", str(checksum)) - cs_node.newProp("type", SUM_TYPE) - -def startXML(): - basedoc = libxml2.newDoc("1.0") - baseroot = basedoc.newChild(None, "metadata", None) - basens = baseroot.newNs('http://linux.duke.edu/metadata/common', None) - formatns = baseroot.newNs('http://linux.duke.edu/metadata/rpm', 'rpm') - baseroot.setNs(basens) - return (basedoc, baseroot) - -def endXML(xmldoc, filename, srcdir, compressed=True): - if compressed: - outfile = _gzipOpen("%s%s/%s.gz" % (srcdir, REPODATA, filename), "w") - output = xmldoc.serialize('UTF-8', 1) - outfile.write(output) - outfile.close() - else: - xmldoc.saveFormatFileEnc("%s%s/%s" % (srcdir, REPODATA, filename), 'UTF-8', 1) - xmldoc.freeDoc() - -def repoXML(srcdir): - """generate the repomd.xml file that stores the info on the other files""" - repodoc = libxml2.newDoc("1.0") - reporoot = repodoc.newChild(None, "repomd", None) - repons = reporoot.newNs('http://linux.duke.edu/metadata/repo', None) - reporoot.setNs(repons) - repofilepath = "%s%s/%s" % (srcdir, REPODATA, REPOMDFILE) - filename = "%s%s/%s.gz" % (srcdir, REPODATA, REPOFILE) - filetype = "deltas" - zfo = _gzipOpen(filename, "rb") - uncsum = getChecksum(SUM_TYPE, zfo) - zfo.close() - csum = getChecksum(SUM_TYPE, filename) - timestamp = os.stat(filename)[8] - data = reporoot.newChild(None, 'data', None) - data.newProp('type', filetype) - location = data.newChild(None, 'location', None) - location.newProp('href', "%s/%s.gz" % (REPODATA, REPOFILE)) - checksum = data.newChild(None, 'checksum', csum) - checksum.newProp('type', SUM_TYPE) - timestamp = data.newChild(None, 'timestamp', str(timestamp)) - unchecksum = data.newChild(None, 'open-checksum', uncsum) - unchecksum.newProp('type', SUM_TYPE) - endXML(repodoc, REPOMDFILE, srcdir, False) - -def genDeltaRPM(ts, newrpm, oldrpm, is_new_package, srcdir, dstdir, locroot): - (f1,n1,e1,v1,r1,a1) = newrpm - (f2,n2,e2,v2,r2,a2) = oldrpm - hdr = rpmUtils.miscutils.hdrFromPackage(ts,f1) - arch = hdr['arch'] - v12 = "_".join([v1,v2]) - r12 = "_".join([r1,r2]) - deltaRPMName= '%s/%s.%s.%s' % (dstdir, "-".join([n1,v12,r12]), a1, SUFFIX) - if DEBUG: - print "DEBUG " + deltaCommand - # If the drpm doesn't exists, make it, else skip it - if os.path.exists("%s%s" % (srcdir, deltaRPMName)): - dsize = os.path.getsize("%s%s" % (srcdir, deltaRPMName)) - if e1 == e2: - print 'Using pre-generated delta rpm for %s.%s - %s.%s => %s.%s' % (n1, a1, v2, r2, v1, r1) - else: - print 'Using pre-generated delta rpm for %s.%s - %s:%s.%s => %s:%s.%s' % (n1, a1, e2, v2, r2, e1, v1, r1) - # Get checksum - seqfile = open("%s%s.seq" % (srcdir, deltaRPMName), "r") - sequence = seqfile.read()[:-1] - sequence = sequence[sequence.rfind("-")+1:] - seqfile.close() - if is_new_package: - locroot = XML_start_newrpm(locroot, newrpm, len(srcdir)) - is_new_package = False - XML_oldrpm(locroot, deltaRPMName, oldrpm, newrpm, sequence, dsize) - if DEBUG: - print "DEBUG skipping %s" % (deltaRPMName) - elif os.path.exists("%s%s.dontdelta" % (srcdir, deltaRPMName)): - pass - else: - deltaCommand = 'makedeltarpm -s %s%s.seq %s %s %s%s' % (srcdir, deltaRPMName, f2, f1, srcdir, deltaRPMName) - (code, out) = commands.getstatusoutput(deltaCommand) - if code: - #raise Exception("genDeltaRPM: exitcode was %s - Reported Error: %s" % (code, out)) - print "Error genDeltaRPM for %s: exitcode was %s - Reported Error: %s" % (n1, code, out) - - # Get size - dsize = os.path.getsize("%s%s" % (srcdir, deltaRPMName)) - - # Get checksum - seqfile = open("%s%s.seq" % (srcdir, deltaRPMName), "r") - sequence = seqfile.read()[:-1] - sequence = sequence[sequence.rfind("-")+1:] - seqfile.close() - - # Check whether or not we should keep the drpm - if not drpmIsWorthKeeping(deltaRPMName, f1, srcdir): - if DEBUG: - print 'deleting %s' % (deltaRPMName) - try: - os.unlink("%s%s" % (srcdir, deltaRPMName)) - except Exception, e: - print "Error deleting deltarpm %s" % (deltaRPMName), str(e) - try: - os.unlink("%s%s.seq" % (srcdir, deltaRPMName)) - except Exception, e: - print "Error deleting checksum %s.seq" % (deltaRPMName), str(e) - f = open("%s%s.dontdelta" % (srcdir, deltaRPMName), "w") - f.close() - else: - if e1 == e2: - print 'Generated delta rpm for %s.%s - %s.%s => %s.%s' % (n1, a1, v2, r2, v1, r1) - else: - print 'Generated delta rpm for %s.%s - %s:%s.%s => %s:%s.%s' % (n1, a1, e2, v2, r2, e1, v1, r1) - - if is_new_package: - locroot = XML_start_newrpm(locroot, newrpm, len(srcdir)) - is_new_package = False - XML_oldrpm(locroot, deltaRPMName, oldrpm, newrpm, sequence, dsize) - return (is_new_package, locroot) - -def drpmIsWorthKeeping(deltaRPMName, newrpm, srcdir): - newsize = os.path.getsize(newrpm) - drpmsize = os.path.getsize("%s%s" % (srcdir, deltaRPMName)) - # Delete the drpm if it's too large - if drpmsize > DRPMWORTHKEEPINGTHRESH * newsize: - return False - return True - -def createPrestoRepo(srcdir, dstdir): - ts = rpmUtils.transaction.initReadOnlyTransaction() - changed = False - - # Create list of .rpm files. - # We don't use "glob", so sub-directories are supported. - print 'Using source dir: %s' % srcdir - print 'Using destination dir: %s' % dstdir - if dstdir[-1] == "/": - dstdir = dstdir[:-1] - srcfiles = [] - for root, dirs, files in os.walk(srcdir): - for f in fnmatch.filter(files,'*.rpm'): - srcfiles.append(os.path.join(root,f)) - if not len(srcfiles): - print ' Nothing found.' - return changed - assert srcfiles[0].startswith(srcdir) - - # Check whether dstdir exists, and if it doesn't, create it - if not os.access(dstdir, os.F_OK): - os.makedirs(dstdir, 0755) - elif not os.access(dstdir, os.W_OK): - print 'ERROR: Unable to write to %s' % dstdir - sys.exit(1) - - # Check whether REPODATA exists, and if it doesn't, create it - if not os.access("%s%s" % (srcdir, REPODATA), os.F_OK): - os.makedirs("%s%s" % (srcdir, REPODATA), 0755) - elif not os.access(dstdir, os.W_OK): - print 'ERROR: Unable to write to %s' % REPODATA - sys.exit(1) - - # Create XML document -# xmldoc = libxml2.newDoc("1.0") -# xmlroot = xmldoc - (xmldoc, xmlroot) = startXML() - - # Create map: rpm %name -> list of tuples (filename,name,e,v,r) - newestsrcrpms = {} - for f in srcfiles: - hdr = rpmUtils.miscutils.hdrFromPackage(ts, f) - nm = hdr['name'] + "." + hdr['arch'] - n = hdr['name'] - a = hdr['arch'] - v = hdr['version'] - r = hdr['release'] - e = hdr['epoch'] - if e is None: - e = 0 - newestsrcrpms.setdefault(nm,[]) - newestsrcrpms[nm].append((f,n,e,v,r,a)) - - # Now purge old src.rpm unless their %name matches a white-list pattern. - for l in newestsrcrpms.itervalues(): - x = len(l) - - if x > 1: - def sortByEVR(fnevr1, fnevr2): - (f1,n1,e1,v1,r1,a1) = fnevr1 - (f2,n2,e2,v2,r2,a2) = fnevr2 - rc = rpmUtils.miscutils.compareEVR((e1,v1,r1),(e2,v2,r2)) - if rc == 0: - return 0 - if rc > 0: - return -1 - if rc < 0: - return 1 - - l.sort(sortByEVR) # highest first in list - - # Generate delta rpm - is_new_package = True - locroot = xmlroot - for rpm in l[1:]: - (is_new_package, locroot) = genDeltaRPM(ts, l[0], rpm, is_new_package, srcdir, dstdir, locroot) - - if not len(srcfiles): - print 'WARNING: No .rpms left. Stopping here.' - return changed - - # Write out end of deltas.xml file - endXML(xmldoc, REPOFILE, srcdir, True) - repoXML(srcdir) - - # Examine binary repository directories and remove everything which - # is missing its corresponding src.rpm. - return changed - - -def main(bin_rpm_path, delta_rpm_path): - assert rpmUtils.miscutils.compareEVR((1,2,3),(1,2,0)) > 0 - assert rpmUtils.miscutils.compareEVR((0,1,2),(0,1,2)) == 0 - assert rpmUtils.miscutils.compareEVR((1,2,3),(4,0,99)) < 0 - - return createPrestoRepo(bin_rpm_path, delta_rpm_path) - - -if __name__ == '__main__': - if len(sys.argv) < 2: - print 'Usage: %s \n' % os.path.basename(sys.argv[0]) - sys.exit(errno.EINVAL) - bin_rpm_path = sys.argv[1] - delta_rpm_path = sys.argv[2] - - #### cfg = Utils.load_config_module(sys.argv[1]) - - #### Utils.signer_gid_check(cfg.signersgid) - #### os.umask(cfg.signersumask) - - #### for dist in sys.argv[2:]: - #### if not cfg.archdict.has_key(dist): - #### print "No distribution release named '%s' found" % dist - #### sys.exit(errno.EINVAL) - main(bin_rpm_path, delta_rpm_path) - sys.exit(0) diff --git a/createprestorepo/dumpMetadata.py b/createprestorepo/dumpMetadata.py index 2f074da..5ad9cf5 100755 --- a/createprestorepo/dumpMetadata.py +++ b/createprestorepo/dumpMetadata.py @@ -233,7 +233,8 @@ class RpmMetaData: self.pkgid = self.doChecksumCache(fo) fo.seek(0) (self.rangestart, self.rangeend) = byteranges(fo) - fo.close() + fo.seek(self.rangeend) + self._getOldInfo(fo) del fo del fd @@ -243,29 +244,40 @@ class RpmMetaData: else: return self.tagByName('arch') - def _correctVersion(self, vers): - returnvers = [] - vertuple = (None, None, None) - if vers is None: - returnvers.append(vertuple) - return returnvers + def _stringToNEVR(self, string): + i = string.rfind("-", 0, string.rfind("-")-1) + name = string[:i] + (epoch, ver, rel) = self._stringToVersion(string[i+1:]) + return (name, epoch, ver, rel) + + def _getLength(self, in_data): + length = 0 + for val in in_data: + length = length * 256 + length += ord(val) + return length + + def _getOldInfo(self, fo): + try: + compobj = gzip.GzipFile("", "rb", 9, fo) + except: + raise zlibError("Data not stored in gzip format") - if type(vers) is not types.ListType: - if vers is not None: - vertuple = self._stringToVersion(vers) - else: - vertuple = (None, None, None) - returnvers.append(vertuple) - else: - for ver in vers: - if ver is not None: - vertuple = self._stringToVersion(ver) - else: - vertuple = (None, None, None) - returnvers.append(vertuple) - return returnvers + if compobj.read(4)[:3] != "DLT": + raise Exception("Not a deltarpm") + + nevr_length = self._getLength(compobj.read(4)) + nevr = compobj.read(nevr_length).strip("\x00") + seq_length = self._getLength(compobj.read(4)) + seq = compobj.read(seq_length) + hex_seq = "" + for char in seq: + hex_seq += str("%02x" % ord(char)) + self.oldnevrstring = nevr + self.oldnevr = self._stringToNEVR(nevr) + self.sequence = hex_seq + compobj.close() - def _stringToVersion(self, strng): i = strng.find(':') if i != -1: @@ -400,7 +412,7 @@ class RpmMetaData: if not self.options['cache']: return getChecksum(self.options['sumtype'], fo) - csumtag = '%s-%s' % (self.hdr['name'] , self.hdr[rpm.RPMTAG_SHA1HEADER]) + csumtag = os.path.basename(self.relativepath) + ".cache" csumfile = '%s/%s' % (self.options['cachedir'], csumtag) if os.path.exists(csumfile) and self.mtime <= os.stat(csumfile)[8]: csumo = open(csumfile, 'r') @@ -417,19 +429,44 @@ class RpmMetaData: -def generateXML(doc, node, formatns, drpmObj, sumtype): +def generateXML(doc, node, formatns, drpmObj, sumtype, pkgDeltas): """takes an xml doc object and a package metadata entry node, populates a package node with the md information""" - ns = node.ns() - pkgNode = node.newChild(None, "package", None) - pkgNode.newProp('type', 'rpm') - pkgNode.newChild(None, 'name', drpmObj.tagByName('name')) - pkgNode.newChild(None, 'arch', drpmObj.arch()) - version = pkgNode.newChild(None, 'version', None) - version.newProp('epoch', str(drpmObj.epoch())) - version.newProp('ver', str(drpmObj.tagByName('version'))) - version.newProp('rel', str(drpmObj.tagByName('release'))) - return pkgNode + name = drpmObj.tagByName('name') + arch = drpmObj.arch() + epoch = str(drpmObj.epoch()) + ver = str(drpmObj.tagByName('version')) + rel = str(drpmObj.tagByName('release')) + if not pkgDeltas.has_key('%s-%s:%s-%s.%s' % (name, epoch, ver, rel, arch)): + pkgNode = node.newChild(None, "package", None) + pkgNode.newProp('type', 'rpm') + pkgNode.newChild(None, 'name', name) + pkgNode.newChild(None, 'arch', arch) + version = pkgNode.newChild(None, 'version', None) + version.newProp('epoch', epoch) + version.newProp('ver', ver) + version.newProp('rel', rel) + deltas = pkgNode.newChild(None, 'deltas', None) + pkgDeltas['%s-%s:%s-%s.%s' % (name, epoch, ver, rel, arch)] = deltas + else: + deltas = pkgDeltas['%s-%s:%s-%s.%s' % (name, epoch, ver, rel, arch)] + (oldname, oldepoch, oldver, oldrel) = drpmObj.oldnevr + drpmNode = deltas.newChild(None, "oldrpm", None) + if name != oldname: + drpmNode.newChild(None, 'name', oldname) + # oldrpm arch is not stored in drpm, so we can only work within same arch + version = drpmNode.newChild(None, 'version', None) + if epoch != oldepoch: + version.newProp('epoch', oldepoch) + if ver != oldver: + version.newProp('ver', oldver) + version.newProp('rel', oldrel) + drpmNode.newChild(None, 'drpm_filename', drpmObj.relativepath) + drpmNode.newChild(None, 'size', str(drpmObj.size)) + drpmNode.newChild(None, 'sequence', '%s-%s' % (drpmObj.oldnevrstring, drpmObj.sequence)) + checksum = drpmNode.newChild(None, 'checksum', drpmObj.pkgid) + checksum.newProp('type', drpmObj.options['sumtype']) + def repoXML(node, cmds): """generate the repomd.xml file that stores the info on the other files""" diff --git a/createprestorepo/genprestometadata.py b/createprestorepo/genprestometadata.py old mode 100644 new mode 100755 index 402af25..9d7a7a8 --- a/createprestorepo/genprestometadata.py +++ b/createprestorepo/genprestometadata.py @@ -62,6 +62,7 @@ class MetaDataGenerator: self.cmds = cmds self.ts = rpm.TransactionSet() self.pkgcount = 0 + self.newrpms = {} self.files = [] def getFileList(self, basepath, path, ext, filelist): @@ -87,7 +88,7 @@ class MetaDataGenerator: filelist.append(os.path.join(relativepath, d)) else: raise "basepath '%s' not found in path '%s'" % (basepath, totalpath) - + return filelist @@ -108,7 +109,7 @@ class MetaDataGenerator: """all the heavy lifting for the package metadata""" # rpms we're going to be dealing with - files = self.getFileList(self.cmds['basedir'], directory, '.dpm', []) + files = self.getFileList(self.cmds['basedir'], directory, '.drpm', []) files = self.trimRpms(files) self.pkgcount = len(files) self.openMetadataDocs() @@ -150,18 +151,10 @@ class MetaDataGenerator: continue else: try: - node = dumpMetadata.generateXML(self.prestodoc, self.prestoroot, self.formatns, mdobj, self.cmds['sumtype']) + dumpMetadata.generateXML(self.prestodoc, self.prestoroot, self.formatns, mdobj, self.cmds['sumtype'], self.newrpms) except dumpMetadata.MDError, e: errorprint(_('\nAn error occurred creating presto metadata: %s') % e) continue - else: - output = node.serialize('UTF-8', self.cmds['pretty']) - self.prestofile.write(output) - self.prestofile.write('\n') - node.unlinkNode() - node.freeNode() - del node - return current @@ -172,7 +165,10 @@ class MetaDataGenerator: # save them up to the tmp locations: if not self.cmds['quiet']: print _('Saving Presto metadata') - self.prestofile.write('\n') + output = self.prestoroot.serialize('UTF-8', self.cmds['pretty']) + output = output[output.find("\n")+1:] + self.prestofile.write(output) + self.prestofile.write("\n") self.prestofile.close() self.prestodoc.freeDoc() @@ -357,16 +353,6 @@ def parseArgs(args): directory = '.' if not cmds['outputdir']: cmds['outputdir'] = cmds['basedir'] - if cmds['groupfile']: - a = cmds['groupfile'] - if cmds['split']: - a = os.path.join(cmds['basedir'], directory, cmds['groupfile']) - elif not os.path.isabs(a): - a = os.path.join(cmds['basedir'], cmds['groupfile']) - if not os.path.exists(a): - errorprint(_('Error: groupfile %s cannot be found.' % a)) - usage() - cmds['groupfile'] = a if cmds['cachedir']: a = cmds ['cachedir'] if not os.path.isabs(a): @@ -467,13 +453,6 @@ def main(args): errorprint(_('Error was %s') % e) sys.exit(1) - # Clean up any update metadata - mdpath = os.path.join(cmds['basedir'], cmds['olddir'], cmds['update-info-dir']) - if os.path.isdir(mdpath): - for file in os.listdir(mdpath): - os.remove(os.path.join(mdpath, file)) - os.rmdir(mdpath) - #XXX: fix to remove tree as we mung basedir try: diff --git a/yum-presto/ChangeLog b/yum-presto/ChangeLog index cd39780..4612d88 100644 --- a/yum-presto/ChangeLog +++ b/yum-presto/ChangeLog @@ -1,3 +1,7 @@ +* Tue Apr 3 2007 Jonathan Dieter - 0.3.4 + - Add patch from Ahmed Kamal to put rebuilding of rpms into a + different thread + * Fri Mar 30 2007 Jonathan Dieter - 0.3.3 - Change to how presto.xml.gz stores sequence. An attempt to save space doesn't always work, so get rid of it. diff --git a/yum-presto/Makefile b/yum-presto/Makefile index a335109..8826b30 100644 --- a/yum-presto/Makefile +++ b/yum-presto/Makefile @@ -11,6 +11,7 @@ install: install -m 644 shared/prestoRepo.py $(DESTDIR)/usr/share/presto install -m 644 shared/prestomdparser.py $(DESTDIR)/usr/share/presto install -m 644 shared/prestoTransaction.py $(DESTDIR)/usr/share/presto + install -m 644 shared/prestoThread.py $(DESTDIR)/usr/share/presto install -m 644 shared/prestoLog.py $(DESTDIR)/usr/share/presto install -m 644 shared/prestoDownload.py $(DESTDIR)/usr/share/presto install -m 644 shared/deltarpm.py $(DESTDIR)/usr/share/presto diff --git a/yum-presto/shared/prestoDownload.py b/yum-presto/shared/prestoDownload.py index 340ad1c..f21d409 100644 --- a/yum-presto/shared/prestoDownload.py +++ b/yum-presto/shared/prestoDownload.py @@ -21,6 +21,9 @@ from yum import Errors from yum import types from urlgrabber.grabber import URLGrabError import deltarpm +import prestoThread +import Queue +import thread def verifyDelta(fo, po, conduit, raiseError): """verifies the deltarpm is what we expect it to be @@ -57,6 +60,41 @@ def verifyChecksum(filename, checksumType, csum): return 0 +def reconstruct(conduit, po): + deltalocal = po.returnSimple('deltalocalpath') + retlist = "" + + drpm = deltarpm.DeltaRpmWrapper(conduit) + + try: + drpm.apply(po.localpath, deltalocal) + except: + retlist += "Error rebuilding rpm from %s! Will download full package.\n" % os.path.basename(deltalocal) + try: + os.unlink(po.localpath) + except: + pass + else: + retlist += "Built %s from deltarpm\n" % (os.path.basename(po.localpath), os.path.basename(deltalocal)) + # Check to see whether or not we should keep the drpms + # FIXME: Is there any way to see whether or not a Boolean option was not set? + if conduit.confBool('main', 'neverkeepdeltas'): + delete = True + elif conduit.confBool('main', 'keepdeltas'): + delete = False + elif conduit.getConf().keepcache != 0: + delete = False + else: + delete = True + + if delete: + try: + os.unlink(deltalocal) + except: + pass + return retlist + + def downloadPkgs(conduit, pkglist): """download list of package objects handed to you, return errors""" @@ -67,10 +105,15 @@ def downloadPkgs(conduit, pkglist): def adderror(po, msg): errors.setdefault(po, []).append(msg) + # Set up thread for applying drpms + queue = Queue.Queue(0) + lock = thread.allocate_lock() + curthread = prestoThread.ReconstructionThread(queue, lock, reconstruct) + curthread.start() + # Check whether drpm is already downloaded repo_cached = False remote_pkgs = [] - rebuild_pkgs = [] for po in conduit.getDownloadPackages(): if hasattr(po, 'has_drpm') and po.has_drpm: po.to.pkgtup = po.to.realpkgtup @@ -89,13 +132,13 @@ def downloadPkgs(conduit, pkglist): if cursize >= totsize: # otherwise keep it around for regetting os.unlink(local) else: - # Deltarpm is local and good, let's put it in the rebuild list + # Deltarpm is local and good, let's put it in the rebuild thread. conduit.info(5, "using local copy of deltarpm for %s" % po) - rebuild_pkgs.append(po) + queue.put((conduit, po)) continue remote_pkgs.append(po) - - # Download deltarpms + + # Download deltarpms and send them to another thread to be rebuilt i = 0 for po in remote_pkgs: i += 1 @@ -116,7 +159,7 @@ def downloadPkgs(conduit, pkglist): except Errors.RepoError, e: adderror(po, str(e)) else: - rebuild_pkgs.append(po) + queue.put((conduit, po)) po.simple['deltalocalpath'] = deltalocal if errors.has_key(po): @@ -129,43 +172,21 @@ def downloadPkgs(conduit, pkglist): del po.simple['realpackagesize'] del po.simple['realrelativepath'] del po.simple['reallocalpath'] + + # Check for waiting messages from building thread + lock.acquire() + if curthread.messages != "": + conduit.info(2, curthread.messages[:-1]) + curthread.messages = "" + lock.release() + + # Tell build thread that there are no more drpms and wait for it to exit + curthread.can_exit = True + curthread.join() + + if curthread.messages != "": + conduit.info(2, curthread.messages[:-1]) + curthread.messages = "" - # Rebuild rpms from downloaded deltarpms - for po in rebuild_pkgs: - deltalocal = po.returnSimple('deltalocalpath') - drpm = deltarpm.DeltaRpmWrapper(conduit) - try: - conduit.info(2, "Building %s from %s" % (os.path.basename(po.localpath), os.path.basename(deltalocal))) - drpm.apply(po.localpath, deltalocal) - except: - conduit.info(2, "Error rebuilding rpm from %s! Will download full package." % os.path.basename(deltalocal)) - try: - os.unlink(po.localpath) - except: - pass - else: - # Set package type to local, so yum doesn't try to download it later - # po.pkgtype = "local" # If we set this, we can't auto-install public keys - # and yum is smart enough to detect the full rpm and - # not redownload it. - - # Check to see whether or not we should keep the drpms - # FIXME: Is there any way to see whether or not a Boolean option was not set? - if conduit.confBool('main', 'neverkeepdeltas'): - delete = True - elif conduit.confBool('main', 'keepdeltas'): - delete = False - elif conduit.getConf().keepcache != 0: - delete = False - else: - delete = True - - if delete: - try: - os.unlink(deltalocal) - except: - pass - return errors - diff --git a/yum-presto/shared/prestoThread.py b/yum-presto/shared/prestoThread.py new file mode 100644 index 0000000..e67db76 --- /dev/null +++ b/yum-presto/shared/prestoThread.py @@ -0,0 +1,52 @@ +# authors: Ahmed Kamal +# Jonathan Dieter +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Library General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# Copyright 2007 Ahmed Kamal, Jonathan Dieter + +import threading + +class ReconstructionThread(threading.Thread): + def __init__(self, queue, lock, run_function): + threading.Thread.__init__(self) + self.run_function = run_function + self.queue = queue + self.lock = lock + self.can_exit = False + self.messages = "" + + def run(self): + while True: + try: + retval = self.queue.get(not self.can_exit) + except: + # If we're done with our drpms and no more are coming, let's + # blow this joint + break + if retval != None: + messages = apply(self.run_function, retval) + if self.can_exit: + # If there are not going to be any more new drpms, + # send messages directly to conduit + conduit = retval[0] + if self.messages != "": + conduit.info(2, self.messages[:-1]) + self.messages = "" + conduit.info(2, messages[:-1]) + else: + # We may be downloading drpms still, so queue messages + self.lock.acquire() + self.messages += messages + self.lock.release() -- cgit