1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
|
import rpm
import string
from constants import *
# set DB_PRIVATE to make rpm happy
rpm.addMacro("__dbi_cdb", "create private mpool mp_mmapsize=16Mb mp_size=1Mb")
def dEBUG(str):
print str
def addNewPackageToUpgSet(pkgDict, pkg):
"""Check to see if there's already a pkg by the name of pkg already
in our dictionary. If not, add this one. If there is, see if
this one is 'newer' or has a 'better' arch."""
name = pkg[rpm.RPMTAG_NAME]
if not pkgDict.has_key(name):
# nope
pkgDict[name] = pkg
else:
# first check version
val = rpm.versionCompare(pkgDict[name], pkg)
if val < 0:
# we're newer, add this one
pkgDict[name] = pkg
elif val == 0:
# same version, so check the architecture
newscore = rpm.archscore(pkg[rpm.RPMTAG_ARCH])
oldscore = pkgDict[name][rpm.RPMTAG_ARCH]
if newscore and newscore < oldscore:
# if the score is less, we're "better"
pkgDict[name] = pkg
def findpackageset(hdrlist, dbPath='/'):
ts = rpm.TransactionSet(dbPath)
ts.setVSFlags(~(rpm.RPMVSF_NORSA|rpm.RPMVSF_NODSA|rpm.RPMVSF_NOMD5))
pkgDict = {}
# go through and figure out which packages in the header list are
# actually applicable for our architecture
pkgDict = {}
for h in hdrlist:
score1 = rpm.archscore(h[rpm.RPMTAG_ARCH])
if (score1):
name = h[rpm.RPMTAG_NAME]
if pkgDict.has_key(name):
score2 = rpm.archscore(pkgDict[name][rpm.RPMTAG_ARCH])
if (score1 < score2):
pkgDict[name] = h
else:
pkgDict[name] = h
hdlist = pkgDict.values()
pkgDict = {}
# loop through packages and find ones which are a newer
# version than what we have
for pkg in hdlist:
mi = ts.dbMatch('name', pkg[rpm.RPMTAG_NAME])
for h in mi:
val = rpm.versionCompare(h, pkg)
if (val > 0):
# dEBUG("found older version of %(name)s" % h)
pass
elif (val < 0):
# dEBUG("found newer version of %(name)s" % h)
# check if we already have this package in our dictionary
addNewPackageToUpgSet(pkgDict, pkg)
else:
# dEBUG("found same verison of %(name)s" % h)
pass
# handle obsoletes
for pkg in hdlist:
if pkg[rpm.RPMTAG_NAME] in pkgDict.keys():
# dEBUG("%(name)s is already selected" % pkg)
continue
if pkg[rpm.RPMTAG_OBSOLETENAME] is not None:
for obs,obsver in zip(pkg[rpm.RPMTAG_OBSOLETENAME],pkg[rpm.RPMTAG_OBSOLETEVERSION]):
mi = ts.dbMatch('name', obs)
oevr = strToVersion(obsver)
for h in mi:
if not obsver:
# unversioned obsoletes win
addNewPackageToUpgSet(pkgDict, pkg)
break
else:
# dEBUG("adding %(name)s to the upgrade set for obsoletes" % pkg)
if h[rpm.RPMTAG_EPOCH] is None:
epoch = '0'
else:
epoch = str(h[rpm.RPMTAG_EPOCH])
val = rpm.labelCompare(oevr,(epoch,h[rpm.RPMTAG_VERSION],h[rpm.RPMTAG_RELEASE]))
if val > 0:
addNewPackageToUpgSet(pkgDict, pkg)
break
return pkgDict.values()
def strToVersion(str):
"""Parse a string such as in obsoleteversion into evr.
Gratuitously borrowed from yum str_to_version
FIXME: should be implemented in and use rpmUtils"""
i = string.find(str, ':')
if i != -1:
epoch = string.atol(str[:i])
else:
epoch = '0'
j = string.find(str, '-')
if j != -1:
if str[i + 1:j] == '':
version = None
else: version = str[i + 1:j]
release = str[j + 1:]
else:
if str[i + 1:] == '':
version = None
else:
version = str[i + 1:]
release = None
return (epoch, version, release)
if __name__ == "__main__":
import sys, os
if len(sys.argv) < 2:
print "Usage: %s /path/to/tree [rootpath]" %(sys.argv[0],)
sys.exit(0)
tree = sys.argv[1]
if len(sys.argv) >= 3:
instPath = sys.argv[2]
else:
instPath = "/"
fd = os.open("%s/%s/base/hdlist" %(tree, productPath), os.O_RDONLY)
hdlist = rpm.readHeaderListFromFD(fd)
os.close(fd)
packages = findpackageset(hdlist, instPath)
for pkg in packages:
print pkg[rpm.RPMTAG_NAME]
|