# author: Jonathan Dieter # # mostly taken from deltarpm.py created by # Lars Herrmann # and modified for Presto by # Ahmed Kamal # # license: GPL (see COPYING file in distribution) # # this module provides a python wrapper around deltarpm tools written by suse # # TODO: catch exceptions wherever possible and raise useful ones ;) # see TODO lines in methods APPLY='/usr/bin/applydeltarpm' import popen2 import string import os class Process: """wrapper class to execute programs and return exitcode and output (stdout and stderr combined)""" def __init__(self, conduit): self.__stdout=None self.__returncode=None self.__command=None self.__args=None self.conduit = conduit def run(self, command, *args): self.__command=command self.__args=args cmdline=command+" "+string.join(args, " ") self.conduit.info(7, '%s.%s: executing %s' % (self.__class__, 'run', cmdline)) pipe = popen2.Popen4(cmdline) self.__stdout=pipe.fromchild.read() retcode = pipe.wait() if os.WIFEXITED(retcode): self.__returncode = os.WEXITSTATUS(retcode) else: self.__returncode = retcode # fallback to old implementation - works better ? #stdoutp = os.popen(cmdline,'r',1) #self.__stdout = stdoutp.read() #retcode = stdoutp.close() #if retcode is None: # self.__returncode = 0 #else: # self.__returncode = retcode def getOutput(self): return self.__stdout def returnCode(self): return self.__returncode class DeltaRpmWrapper: """wrapper around deltarpm binaries - implement methods for applying and verifying delta rpms - raises exceptions if exitcode of binaries was != 0""" def __init__(self, conduit): self.conduit = conduit self.conduit.info(7, '%s.%s: created' % (self.__class__, '__init__')) def apply(self, newrpmfile, deltarpmfile): """wraps execution of applydeltarpm [-r oldrpm] deltarpm newrpm - constructs file names and paths based on given RpmDescription and instance settings for directories""" # TODO: test args for type == instance and __class__ == RpmDescription self.conduit.info(7, '%s.apply(%s,%s)' % (self.__class__, newrpmfile, deltarpmfile)) p=Process(self.conduit) # targetrpm filename p.run(APPLY, deltarpmfile, newrpmfile) if p.returnCode(): # in case of error, raise exception raise Exception("Could not apply deltarpm: %d" % (p.returnCode())) return newrpmfile def verifySequence(self, sequence): """wraps execution of applydeltarpm [-r oldrpm] -s seqfilecontent - constructs file names and paths based on given RpmDescription and instance settings for directories""" self.conduit.info(7, '%s.verify(%s)' % (self.__class__, sequence)) p = Process(self.conduit) p.run(APPLY, '-s', sequence) if p.returnCode(): # in case of error, raise exception raise Exception("Could not verify sequence of deltarpm: %d" % (p.returnCode()))