From bdfb4037667c6af7d0537f9bc36b33d55d86e059 Mon Sep 17 00:00:00 2001 From: Jonathan Dieter Date: Tue, 3 Apr 2007 16:26:09 +0300 Subject: Add Ahmed's patch to rebuild rpms in separate thread. Complete rebuild of createprestorepo Signed-off-by: Jonathan Dieter --- yum-presto/shared/prestoDownload.py | 107 +++++++++++++++++++++--------------- yum-presto/shared/prestoThread.py | 52 ++++++++++++++++++ 2 files changed, 116 insertions(+), 43 deletions(-) create mode 100644 yum-presto/shared/prestoThread.py (limited to 'yum-presto/shared') diff --git a/yum-presto/shared/prestoDownload.py b/yum-presto/shared/prestoDownload.py index 340ad1c..f21d409 100644 --- a/yum-presto/shared/prestoDownload.py +++ b/yum-presto/shared/prestoDownload.py @@ -21,6 +21,9 @@ from yum import Errors from yum import types from urlgrabber.grabber import URLGrabError import deltarpm +import prestoThread +import Queue +import thread def verifyDelta(fo, po, conduit, raiseError): """verifies the deltarpm is what we expect it to be @@ -57,6 +60,41 @@ def verifyChecksum(filename, checksumType, csum): return 0 +def reconstruct(conduit, po): + deltalocal = po.returnSimple('deltalocalpath') + retlist = "" + + drpm = deltarpm.DeltaRpmWrapper(conduit) + + try: + drpm.apply(po.localpath, deltalocal) + except: + retlist += "Error rebuilding rpm from %s! Will download full package.\n" % os.path.basename(deltalocal) + try: + os.unlink(po.localpath) + except: + pass + else: + retlist += "Built %s from deltarpm\n" % (os.path.basename(po.localpath), os.path.basename(deltalocal)) + # Check to see whether or not we should keep the drpms + # FIXME: Is there any way to see whether or not a Boolean option was not set? + if conduit.confBool('main', 'neverkeepdeltas'): + delete = True + elif conduit.confBool('main', 'keepdeltas'): + delete = False + elif conduit.getConf().keepcache != 0: + delete = False + else: + delete = True + + if delete: + try: + os.unlink(deltalocal) + except: + pass + return retlist + + def downloadPkgs(conduit, pkglist): """download list of package objects handed to you, return errors""" @@ -67,10 +105,15 @@ def downloadPkgs(conduit, pkglist): def adderror(po, msg): errors.setdefault(po, []).append(msg) + # Set up thread for applying drpms + queue = Queue.Queue(0) + lock = thread.allocate_lock() + curthread = prestoThread.ReconstructionThread(queue, lock, reconstruct) + curthread.start() + # Check whether drpm is already downloaded repo_cached = False remote_pkgs = [] - rebuild_pkgs = [] for po in conduit.getDownloadPackages(): if hasattr(po, 'has_drpm') and po.has_drpm: po.to.pkgtup = po.to.realpkgtup @@ -89,13 +132,13 @@ def downloadPkgs(conduit, pkglist): if cursize >= totsize: # otherwise keep it around for regetting os.unlink(local) else: - # Deltarpm is local and good, let's put it in the rebuild list + # Deltarpm is local and good, let's put it in the rebuild thread. conduit.info(5, "using local copy of deltarpm for %s" % po) - rebuild_pkgs.append(po) + queue.put((conduit, po)) continue remote_pkgs.append(po) - - # Download deltarpms + + # Download deltarpms and send them to another thread to be rebuilt i = 0 for po in remote_pkgs: i += 1 @@ -116,7 +159,7 @@ def downloadPkgs(conduit, pkglist): except Errors.RepoError, e: adderror(po, str(e)) else: - rebuild_pkgs.append(po) + queue.put((conduit, po)) po.simple['deltalocalpath'] = deltalocal if errors.has_key(po): @@ -129,43 +172,21 @@ def downloadPkgs(conduit, pkglist): del po.simple['realpackagesize'] del po.simple['realrelativepath'] del po.simple['reallocalpath'] + + # Check for waiting messages from building thread + lock.acquire() + if curthread.messages != "": + conduit.info(2, curthread.messages[:-1]) + curthread.messages = "" + lock.release() + + # Tell build thread that there are no more drpms and wait for it to exit + curthread.can_exit = True + curthread.join() + + if curthread.messages != "": + conduit.info(2, curthread.messages[:-1]) + curthread.messages = "" - # Rebuild rpms from downloaded deltarpms - for po in rebuild_pkgs: - deltalocal = po.returnSimple('deltalocalpath') - drpm = deltarpm.DeltaRpmWrapper(conduit) - try: - conduit.info(2, "Building %s from %s" % (os.path.basename(po.localpath), os.path.basename(deltalocal))) - drpm.apply(po.localpath, deltalocal) - except: - conduit.info(2, "Error rebuilding rpm from %s! Will download full package." % os.path.basename(deltalocal)) - try: - os.unlink(po.localpath) - except: - pass - else: - # Set package type to local, so yum doesn't try to download it later - # po.pkgtype = "local" # If we set this, we can't auto-install public keys - # and yum is smart enough to detect the full rpm and - # not redownload it. - - # Check to see whether or not we should keep the drpms - # FIXME: Is there any way to see whether or not a Boolean option was not set? - if conduit.confBool('main', 'neverkeepdeltas'): - delete = True - elif conduit.confBool('main', 'keepdeltas'): - delete = False - elif conduit.getConf().keepcache != 0: - delete = False - else: - delete = True - - if delete: - try: - os.unlink(deltalocal) - except: - pass - return errors - diff --git a/yum-presto/shared/prestoThread.py b/yum-presto/shared/prestoThread.py new file mode 100644 index 0000000..e67db76 --- /dev/null +++ b/yum-presto/shared/prestoThread.py @@ -0,0 +1,52 @@ +# authors: Ahmed Kamal +# Jonathan Dieter +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Library General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# Copyright 2007 Ahmed Kamal, Jonathan Dieter + +import threading + +class ReconstructionThread(threading.Thread): + def __init__(self, queue, lock, run_function): + threading.Thread.__init__(self) + self.run_function = run_function + self.queue = queue + self.lock = lock + self.can_exit = False + self.messages = "" + + def run(self): + while True: + try: + retval = self.queue.get(not self.can_exit) + except: + # If we're done with our drpms and no more are coming, let's + # blow this joint + break + if retval != None: + messages = apply(self.run_function, retval) + if self.can_exit: + # If there are not going to be any more new drpms, + # send messages directly to conduit + conduit = retval[0] + if self.messages != "": + conduit.info(2, self.messages[:-1]) + self.messages = "" + conduit.info(2, messages[:-1]) + else: + # We may be downloading drpms still, so queue messages + self.lock.acquire() + self.messages += messages + self.lock.release() -- cgit