1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
|
import rpm
import string
from constants import *
# set DB_PRIVATE to make rpm happy
rpm.addMacro("__dbi_cdb", "create private mpool mp_mmapsize=16Mb mp_size=1Mb")
def dEBUG(str):
print str
def addNewPackageToUpgSet(pkgDict, pkg):
"""Check to see if there's already a pkg by the name of pkg already
in our dictionary. If not, add this one. If there is, see if
this one is 'newer' or has a 'better' arch."""
name = pkg[rpm.RPMTAG_NAME]
arch = pkg[rpm.RPMTAG_ARCH]
if not pkgDict.has_key((name, arch)):
# nope
pkgDict[(name,arch)] = pkg
else:
# first check version
val = rpm.versionCompare(pkgDict[(name,arch)], pkg)
if val < 0:
# we're newer, add this one
pkgDict[(name,arch)] = pkg
def comparePackageForUpgrade(updDict, h, pkg):
val = rpm.versionCompare(h, pkg)
if (val > 0):
# dEBUG("found older version of %(name)s %(arch)s" % h)
pass
elif (val < 0):
# dEBUG("found newer version of %(name)s %(arch)s" % h)
# check if we already have this package in our dictionary
addNewPackageToUpgSet(updDict, pkg)
else:
# dEBUG("found same verison of %(name)s %(arch)s" % h)
pass
def findBestArch(arch, archlist):
bestarch = arch
for availarch in archlist:
newscore = rpm.archscore(availarch)
oldscore = rpm.archscore(bestarch)
# Both unsupported
if newscore == 0 and oldscore == 0:
continue
# If old arch is better or now unsupported and we have a better one
if oldscore < newscore:
if oldscore == 0:
bestarch = availarch
# If new arch is better but not unsupported
if newscore < oldscore:
if newscore != 0:
bestarch = availarch
if oldscore = newscore:
pass
return bestarch
def getAvailPackages(hdrlist):
# go through and figure out which packages in the header list are
# actually applicable for our architecture
pkgDict = {}
nameDict = {}
for h in hdrlist:
score1 = rpm.archscore(h[rpm.RPMTAG_ARCH])
if (score1):
name = h[rpm.RPMTAG_NAME]
arch = h[rpm.RPMTAG_ARCH]
pkgDict[(name,arch)] = h
if nameDict.has_key(name):
nameDict[name].append(arch)
else:
nameDict[name] = [ arch ]
return (pkgDict, nameDict)
def getInstalledPackages(dbPath='/'):
pkgDict = {}
nameDict = {}
ts = rpm.TransactionSet(dbPath)
ts.setVSFlags(~(rpm.RPMVSF_NORSA|rpm.RPMVSF_NODSA|rpm.RPMVSF_NOMD5))
mi = ts.dbMatch()
for h in mi:
name = h[rpm.RPMTAG_NAME]
arch = h[rpm.RPMTAG_ARCH]
pkgDict[(name,arch)] = h
if nameDict.has_key(name):
nameDict[name].append(arch)
else:
nameDict[name] = [ arch ]
return (pkgDict, nameDict)
def findpackageset(hdrlist, dbPath='/'):
instDict = {}
availDict = {}
updDict = {}
# dicts for name : [archlist]
availNames = {}
instNames = {}
ts = rpm.TransactionSet(dbPath)
ts.setVSFlags(~(rpm.RPMVSF_NORSA|rpm.RPMVSF_NODSA|rpm.RPMVSF_NOMD5))
(availDict, availNames) = getAvailPackages(hdrlist)
(instDict, instNames) = getInstalledPackages(dbPath=dbPath)
hdlist = availDict.values()
# loop through packages and find ones which are a newer
# version than what we have
for ( name, arch ) in instDict.keys():
if ( name, arch ) in availDict.keys():
# Exact arch upgrade
h = instDict[(name, arch)]
pkg = availDict[(name,arch)]
comparePackageForUpgrade(updDict, h, pkg)
else:
# See if we have a better arch than that installed
if name in availNames.keys():
bestarch = findBestArch(arch, availNames[name])
if availDict.has_key((name,bestarch)):
h = instDict[(name,arch)]
pkg = availDict[(name,bestarch)]
comparePackageForUpgrade(updDict, h, pkg)
# handle obsoletes
for pkg in hdlist:
if (pkg[rpm.RPMTAG_NAME],pkg[rpm.RPMTAG_ARCH]) in updDict.keys():
# dEBUG("%(name)s %(arch)s is already selected" % pkg)
continue
if pkg[rpm.RPMTAG_OBSOLETENAME] is not None:
name = pkg[rpm.RPMTAG_NAME]
arch = pkg[rpm.RPMTAG_ARCH]
for obs,obsver in zip(pkg[rpm.RPMTAG_OBSOLETENAME],pkg[rpm.RPMTAG_OBSOLETEVERSION]):
mi = ts.dbMatch('name', obs)
oevr = strToVersion(obsver)
for h in mi:
if not obsver:
# unversioned obsoletes win
addNewPackageToUpgSet(updDict, pkg)
# dEBUG("adding %(name)s to the upgrade set for obsoletes" % pkg)
break
else:
if h[rpm.RPMTAG_EPOCH] is None:
epoch = '0'
else:
epoch = str(h[rpm.RPMTAG_EPOCH])
val = rpm.labelCompare(oevr,(epoch,h[rpm.RPMTAG_VERSION],h[rpm.RPMTAG_RELEASE]))
if val > 0:
# dEBUG("adding %(name)s %(version)s to the upgrade set for obsoletes" % pkg)
updDict[(name,arch)] = pkg
break
return updDict.values()
def strToVersion(str):
"""Parse a string such as in obsoleteversion into evr.
Gratuitously borrowed from yum str_to_version
FIXME: should be implemented in and use rpmUtils"""
i = string.find(str, ':')
if i != -1:
epoch = string.atol(str[:i])
else:
epoch = '0'
j = string.find(str, '-')
if j != -1:
if str[i + 1:j] == '':
version = None
else: version = str[i + 1:j]
release = str[j + 1:]
else:
if str[i + 1:] == '':
version = None
else:
version = str[i + 1:]
release = None
return (epoch, version, release)
if __name__ == "__main__":
import sys, os
if len(sys.argv) < 2:
print "Usage: %s /path/to/tree [rootpath]" %(sys.argv[0],)
sys.exit(0)
tree = sys.argv[1]
if len(sys.argv) >= 3:
instPath = sys.argv[2]
else:
instPath = "/"
fd = os.open("%s/%s/base/hdlist" %(tree, productPath), os.O_RDONLY)
hdlist = rpm.readHeaderListFromFD(fd)
os.close(fd)
packages = findpackageset(hdlist, instPath)
for pkg in packages:
print pkg[rpm.RPMTAG_NAME]
|