1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
|
"""
Use this class to access Nitrate via XML-RPC
This code is based on http://landfill.bugzilla.org/testopia2/testopia/contrib/drivers/python/testopia.py
and https://fedorahosted.org/python-bugzilla/browser/bugzilla/base.py
History:
2011-12-31 bugfix https://bugzilla.redhat.com/show_bug.cgi?id=735937
Example on how to access this library,
from nitrate import NitrateXmlrpc
n = NitrateXmlrpc.from_config('config.cfg')
n.testplan_get(10)
where config.cfg looks like:
[nitrate]
username: xkuang@redhat.com
password: foobar
url: https://tcms.engineering.redhat.com/xmlrpc/
use_mod_kerb: False
Or, more directly:
n = NitrateXmlrpc(
'xkuang@redhat.com',
'foobar',
'https://tcms.engineering.redhat.com/xmlrpc/',
)
n.testplan_get(10)
"""
import xmlrpclib, urllib2, httplib, kerberos
from types import *
from datetime import datetime, time
from cookielib import CookieJar
VERBOSE = 0
DEBUG = 0
class CookieResponse:
'''Fake HTTPResponse object that we can fill with headers we got elsewhere.
We can then pass it to CookieJar.extract_cookies() to make it pull out the
cookies from the set of headers we have.'''
def __init__(self,headers):
self.headers = headers
#log.debug("CookieResponse() headers = %s" % headers)
def info(self):
return self.headers
class CookieTransport(xmlrpclib.Transport):
'''A subclass of xmlrpclib.Transport that supports cookies.'''
cookiejar = None
scheme = 'http'
# Cribbed from xmlrpclib.Transport.send_user_agent
def send_cookies(self, connection, cookie_request):
if self.cookiejar is None:
self.cookiejar = CookieJar()
elif self.cookiejar:
# Let the cookiejar figure out what cookies are appropriate
self.cookiejar.add_cookie_header(cookie_request)
# Pull the cookie headers out of the request object...
cookielist=list()
for h,v in cookie_request.header_items():
if h.startswith('Cookie'):
cookielist.append([h,v])
# ...and put them over the connection
for h,v in cookielist:
connection.putheader(h,v)
# This is the same request() method from xmlrpclib.Transport,
# with a couple additions noted below
def request_with_cookies(self, host, handler, request_body, verbose=0):
h = self.make_connection(host)
if verbose:
h.set_debuglevel(1)
# ADDED: construct the URL and Request object for proper cookie handling
request_url = "%s://%s%s" % (self.scheme,host,handler)
#log.debug("request_url is %s" % request_url)
cookie_request = urllib2.Request(request_url)
self.send_request(h,handler,request_body)
self.send_host(h,host)
self.send_cookies(h,cookie_request) # ADDED. creates cookiejar if None.
self.send_user_agent(h)
self.send_content(h,request_body)
errcode, errmsg, headers = h.getreply()
# ADDED: parse headers and get cookies here
cookie_response = CookieResponse(headers)
# Okay, extract the cookies from the headers
self.cookiejar.extract_cookies(cookie_response,cookie_request)
#log.debug("cookiejar now contains: %s" % self.cookiejar._cookies)
# And write back any changes
if hasattr(self.cookiejar,'save'):
try:
self.cookiejar.save(self.cookiejar.filename)
except Exception, e:
raise
#log.error("Couldn't write cookiefile %s: %s" % \
# (self.cookiejar.filename,str(e)))
if errcode != 200:
# When runs here, the HTTPS connection isn't useful any more
# before raising an exception to caller
h.close()
raise xmlrpclib.ProtocolError(
host + handler,
errcode, errmsg,
headers
)
self.verbose = verbose
try:
sock = h._conn.sock
except AttributeError:
sock = None
try:
return self._parse_response(h.getfile(), sock)
finally:
h.close()
# This is just python 2.7's xmlrpclib.Transport.single_request, with
# send additions noted below to send cookies along with the request
def single_request_with_cookies(self, host, handler, request_body, verbose=0):
h = self.make_connection(host)
if verbose:
h.set_debuglevel(1)
# ADDED: construct the URL and Request object for proper cookie handling
request_url = "%s://%s%s" % (self.scheme,host,handler)
#log.debug("request_url is %s" % request_url)
cookie_request = urllib2.Request(request_url)
try:
self.send_request(h,handler,request_body)
self.send_host(h,host)
self.send_cookies(h,cookie_request) # ADDED. creates cookiejar if None.
self.send_user_agent(h)
self.send_content(h,request_body)
response = h.getresponse(buffering=True)
# ADDED: parse headers and get cookies here
cookie_response = CookieResponse(response.msg)
# Okay, extract the cookies from the headers
self.cookiejar.extract_cookies(cookie_response,cookie_request)
#log.debug("cookiejar now contains: %s" % self.cookiejar._cookies)
# And write back any changes
if hasattr(self.cookiejar,'save'):
try:
self.cookiejar.save(self.cookiejar.filename)
except Exception, e:
raise
#log.error("Couldn't write cookiefile %s: %s" % \
# (self.cookiejar.filename,str(e)))
if response.status == 200:
self.verbose = verbose
return self.parse_response(response)
if (response.getheader("content-length", 0)):
response.read()
raise xmlrpclib.ProtocolError(
host + handler,
response.status, response.reason,
response.msg,
)
except xmlrpclib.Fault:
raise
finally:
h.close()
# Override the appropriate request method
if hasattr(xmlrpclib.Transport, 'single_request'):
single_request = single_request_with_cookies # python 2.7+
else:
request = request_with_cookies # python 2.6 and earlier
class SafeCookieTransport(xmlrpclib.SafeTransport,CookieTransport):
'''SafeTransport subclass that supports cookies.'''
scheme = 'https'
# Override the appropriate request method
if hasattr(xmlrpclib.Transport, 'single_request'):
single_request = CookieTransport.single_request_with_cookies
else:
request = CookieTransport.request_with_cookies
# Stolen from FreeIPA source freeipa-1.2.1/ipa-python/krbtransport.py
class KerbTransport(SafeCookieTransport):
"""Handles Kerberos Negotiation authentication to an XML-RPC server."""
def get_host_info(self, host):
host, extra_headers, x509 = xmlrpclib.Transport.get_host_info(self, host)
# Set the remote host principal
h = host
hostinfo = h.split(':')
service = "HTTP@" + hostinfo[0]
try:
rc, vc = kerberos.authGSSClientInit(service);
except kerberos.GSSError, e:
raise kerberos.GSSError(e)
try:
kerberos.authGSSClientStep(vc, "");
except kerberos.GSSError, e:
raise kerberos.GSSError(e)
extra_headers = [
("Authorization", "negotiate %s" % kerberos.authGSSClientResponse(vc) )
]
return host, extra_headers, x509
def _python_ver_larger_than_2_6(self):
import sys
vi = sys.version_info
return vi[0] >= 2 and vi[1] > 6
def make_connection(self, host):
'''
For fixing bug #735937.
When running on Python 2.7, make_connection will do the same behavior as that of Python 2.6's xmlrpclib
That is in Python 2.6, make_connection will return an individual HTTPS connection for each request
'''
if self._python_ver_larger_than_2_6():
# create a HTTPS connection object from a host descriptor
# host may be a string, or a (host, x509-dict) tuple
try:
HTTPS = httplib.HTTPSConnection
except AttributeError:
raise NotImplementedError(
"your version of httplib doesn't support HTTPS"
)
else:
chost, self._extra_headers, x509 = self.get_host_info(host)
# nitrate isn't ready to use HTTP/1.1 persistent connection mechanism.
# So tell server current opened HTTP connection should be closed after request is handled.
# And there will be a new connection for next request.
self._extra_headers.append(('Connection', 'close'))
self._connection = host, HTTPS(chost, None, **(x509 or {}))
return self._connection[1]
else:
# For Python 2.6, do the default behavior
return SafeCookieTransport.make_connection(self, host)
class NitrateError(Exception):
pass
class NitrateXmlrpcError(Exception):
def __init__(self, verb, params, wrappedError):
self.verb = verb
self.params = params
self.wrappedError = wrappedError
def __str__(self):
return "Error while executing cmd '%s' --> %s" \
% ( self.verb + "(" + self.params + ")", self.wrappedError)
class NitrateXmlrpc(object):
"""
NitrateXmlrpc - Nitrate XML-RPC client
for server deployed without BASIC authentication
"""
@classmethod
def from_config(cls, filename):
from ConfigParser import SafeConfigParser
cp = SafeConfigParser()
cp.read([filename])
kwargs = dict(
[(key, cp.get('nitrate', key)) for key in [
'username', 'password', 'url'
]]
)
return NitrateXmlrpc(**kwargs)
def __init__(self, username, password, url, use_mod_auth_kerb = False):
if url.startswith('https://'):
self._transport = SafeCookieTransport()
elif url.startswith('http://'):
self._transport = CookieTransport()
else:
raise "Unrecognized URL scheme"
self._transport.cookiejar = CookieJar()
# print "COOKIES:", self._transport.cookiejar._cookies
self.server = xmlrpclib.ServerProxy(
url,
transport = self._transport,
verbose = VERBOSE,
allow_none = 1
)
# Login, get a cookie into our cookie jar:
login_dict = self.do_command("Auth.login", [dict(
username = username,
password = password,
)])
# Record the user ID in case the script wants this
# self.user_id = login_dict['id']
# print 'Logged in with cookie for user %i' % self.userId
# print "COOKIES:", self._transport.cookiejar._cookies
def _boolean_option(self, option, value):
"""Returns the boolean option when value is True or False, else ''
Example: _boolean_option('isactive', True) returns " 'isactive': 1,"
"""
if value or str(value) == 'False':
if type(value) is not BooleanType:
raise NitrateError("The value for the option '%s' is not of boolean type." % option)
elif value == False:
return "\'%s\':0, " % option
elif value == True:
return "\'%s\':1, " % option
return ''
def _datetime_option(self, option, value):
"""Returns the string 'option': 'value' where value is a date object formatted
in string as yyyy-mm-dd hh:mm:ss. If value is None, then we return ''.
Example: self._time_option('datetime', datetime(2007,12,05,13,01,03))
returns "'datetime': '2007-12-05 13:01:03'"
"""
if value:
if type(value) is not type(datetime(2000,01,01,12,00,00)):
raise NitrateError("The option '%s' is not a valid datetime object." % option)
return "\'%s\':\'%s\', " % (option, value.strftime("%Y-%m-%d %H:%M:%S"))
return ''
def _list_dictionary_option(self, option, value):
"""Verifies that the value passed for the option is in the format of a list
of dictionaries.
Example: _list_dictionary_option('plan':[{'key1': 'value1', 'key2': 'value2'}])
verifies that value is a list, then verifies that the content of value are dictionaries.
"""
if value: # Verify that value is a type of list
if type(value) is not ListType: # Verify that the content of value are dictionaries,
raise NitrateError("The option '%s' is not a valid list of dictionaries." % option)
else:
for item in value:
if type(item) is not DictType:
raise NitrateError("The option '%s' is not a valid list of dictionaries." % option)
return "\'%s\': %s" % (option, value)
return ''
_list_dict_op = _list_dictionary_option
def _number_option(self, option, value):
"""Returns the string " 'option': value," if value is not None, else ''
Example: self._number_option("isactive", 1) returns " 'isactive': 1,"
"""
if value:
if type(value) is not IntType:
raise NitrateError("The option '%s' is not a valid integer." % option)
return "\'%s\':%d, " % (option, value)
return ''
def _number_no_option(self, number):
"""Returns the number in number. Just a totally useless wrapper :-)
Example: self._number_no_option(1) returns 1
"""
if type(number) is not IntType:
raise NitrateError("The 'number' parameter is not an integer.")
return str(number)
_number_noop = _number_no_option
def _options_dict(self, *args):
"""Creates a wrapper around all the options into a dictionary format.
Example, if args is ['isactive': 1,", 'description', 'Voyage project'], then
the return will be {'isactive': 1,", 'description', 'Voyage project'}
"""
return "{%s}" % ''.join(args)
def _options_non_empty_dict(self, *args):
"""Creates a wrapper around all the options into a dictionary format and
verifies that the dictionary is not empty.
Example, if args is ['isactive': 1,", 'description', 'Voyage project'], then
the return will be {'isactive': 1,", 'description', 'Voyage project'}.
If args is empty, then we raise an error.
"""
if not args:
raise NitrateError, "At least one variable must be set."
return "{%s}" % ''.join(args)
_options_ne_dict = _options_non_empty_dict
def _string_option(self, option, value):
"""Returns the string 'option': 'value'. If value is None, then ''
Example: self._string_option('description', 'Voyage project') returns
"'description' : 'Voyage project',"
"""
if value:
if type(value) is not StringType:
raise NitrateError("The option '%s' is not a valid string." % option)
return "\'%s\':\'%s\', " % (option, value)
return ''
def _string_no_option(self, option):
"""Returns the string 'option'.
Example: self._string_no_option("description") returns "'description'"
"""
if option:
if type(option) is not StringType:
raise NitrateError("The option '%s' is not a valid string." % option)
return "\'%s\'" % option
return ''
_string_noop = _string_no_option
def _time_option(self, option, value):
"""Returns the string 'option': 'value' where value is a time object formatted in string as hh:mm:ss.
If value is None, then we return ''.
Example: self._time_option('time', time(12,00,03)) returns "'time': '12:00:03'"
"""
if value:
if type(value) is not type(time(12,00,00)):
raise NitrateError("The option '%s' is not a valid time object." % option)
return "\'%s\':\'%s\', " % (option, value.strftime("%H:%M:%S"))
return ''
def do_command(self, verb, args = []):
"""Submit a command to the server proxy.
'verb' -- string, the xmlrpc verb,
'args' -- list, the argument list,
"""
params = ''
for arg in args:
params = ("%s" % str(arg), "%s, %s" % (params, str(arg)))[params!='']
cmd = "self.server." + verb + "(" + params + ")"
if DEBUG:
print cmd
try:
return eval(cmd)
except xmlrpclib.Error, e:
raise NitrateXmlrpcError(verb, params, e)
############################## Build #######################################
def build_get(self, build_id):
"""Get A Build by ID.
'build_id' -- integer, Must be greater than 0
Example: build_get(10)
Result: A dictionary of key/value pairs for the attributes listed above
"""
return self.do_command("Build.get", [self._number_noop(build_id)])
############################## User ##################################
def get_me(self):
"""
Description: Get the information of myself
Returns: A blessed User object Hash
"""
return self.do_command("User.get_me")
class NitrateKerbXmlrpc(NitrateXmlrpc):
"""
NitrateXmlrpc - Nitrate XML-RPC client
for server deployed with mod_auth_kerb
"""
def __init__(self, url):
if url.startswith('https://'):
self._transport = KerbTransport()
elif url.startswith('http://'):
raise "Apache module mod_ssl is required by mod_auth_kerb for encrypt the communication."
else:
raise "Unrecognized URL scheme"
self._transport.cookiejar = CookieJar()
# print "COOKIES:", self._transport.cookiejar._cookies
self.server = xmlrpclib.ServerProxy(
url,
transport = self._transport,
verbose = VERBOSE,
allow_none = 1
)
# Login, get a cookie into our cookie jar:
login_dict = self.do_command("Auth.login_krbv", [])
if __name__ == "__main__":
from pprint import pprint
n = NitrateKerbXmlrpc('https://tcms.englab.nay.redhat.com/xmlrpc/')
pprint(n.get_me())
|