# author: Jonathan Dieter # # mostly taken from mdparser.py (part of yum) with a few minor modifications # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Library General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # Copyright 2005 Duke University # Portions copyright 2007 Jonathan Dieter import gzip try: from cElementTree import iterparse except: from xml.etree.cElementTree import iterparse from cStringIO import StringIO #TODO: document everything here class PrestoMDParser: def __init__(self, filename): # Set up mapping of meta types to handler classes handlers = { '{http://linux.duke.edu/metadata/common}metadata': DeltasEntry, } self.total = None self.count = 0 self._handlercls = None # Read in type, set package node handler and get total number of # packages if filename[-3:] == '.gz': fh = gzip.open(filename, 'r') else: fh = open(filename, 'r') parser = iterparse(fh, events=('start', 'end')) self.reader = parser.__iter__() event, elem = self.reader.next() self._handlercls = handlers.get(elem.tag, None) if not self._handlercls: raise ValueError('Unknown repodata type "%s" in %s' % ( elem.tag, filename)) def getDeltaList(self): for event, elem in self.reader: if event == 'end' and elem.tag == '{http://linux.duke.edu/metadata/common}metadata': return self._handlercls(elem) class BaseEntry: def __init__(self, elem): self._p = {} def __getitem__(self, k): return self._p[k] def keys(self): return self._p.keys() def values(self): return self._p.values() def has_key(self, k): return self._p.has_key(k) def __str__(self): out = StringIO() keys = self.keys() keys.sort() for k in keys: line = u'%s=%s\n' % (k, self[k]) out.write(line.encode('utf8')) return out.getvalue() def _bn(self, qn): if qn.find('}') == -1: return qn return qn.split('}')[1] def _prefixprops(self, elem, prefix): ret = {} for key in elem.attrib.keys(): ret[prefix + '_' + self._bn(key)] = elem.attrib[key] return ret class DeltasEntry(BaseEntry): def __init__(self, deltas): BaseEntry.__init__(self, deltas) # Avoid excess typing :) p = self._p for elem in deltas: temp = {} key1 = "" key2 = "" for child in elem: name = self._bn(child.tag) if name in ('name', 'arch'): temp[name] = child.text elif name == 'version': attrib = child.attrib try: attrib['epoch'] = int(attrib['epoch']) except: attrib['epoch'] = 0 key1 = "%s*%s*%i*%s*%s" % (temp['name'], temp['arch'], attrib['epoch'], attrib['ver'], attrib['rel']) elif name == 'deltas': for oldrpm in child: temp2 = {} value = {} key = None for oldrpm_child in oldrpm: name = self._bn(oldrpm_child.tag) if name in ('name', 'arch'): temp2[name] = oldrpm_child.text elif name == 'version': ch_attrib = oldrpm_child.attrib try: ch_attrib['epoch'] = int(ch_attrib['epoch']) except: ch_attrib['epoch'] = attrib['epoch'] try: ch_attrib['ver'] = ch_attrib['ver'] except: ch_attrib['ver'] = attrib['ver'] if not temp2.has_key('name'): temp2['name'] = temp['name'] if not temp2.has_key('arch'): temp2['arch'] = temp['arch'] key2 = "%s*%s*%i*%s*%s" % (temp2['name'], temp2['arch'], ch_attrib['epoch'], ch_attrib['ver'], ch_attrib['rel']) key = "%s!!%s" % (key1, key2) p[key] = {} if name in ('sequence', 'drpm_filename', 'size'): p[key][name] = oldrpm_child.text if name == "checksum": p[key][name] = oldrpm_child.text p[key]["%s_type" % name] = oldrpm_child.attrib['type'] deltas.clear() def test(): import sys parser = PrestoMDParser(sys.argv[1]) deltalist = parser.getDeltaList() print '-' * 40 print deltalist print 'read: %s deltarpms ' % (len(deltalist.keys())) if __name__ == '__main__': test()