diff options
Diffstat (limited to 'tests/helpers/http.py')
-rwxr-xr-x | tests/helpers/http.py | 54 |
1 files changed, 40 insertions, 14 deletions
diff --git a/tests/helpers/http.py b/tests/helpers/http.py index ff696d4..0da7ee2 100755 --- a/tests/helpers/http.py +++ b/tests/helpers/http.py @@ -24,6 +24,7 @@ import string import urlparse import json from urllib import urlencode +from requests_kerberos import HTTPKerberosAuth, OPTIONAL class WrongPage(Exception): @@ -89,18 +90,25 @@ class HttpSessions(object): raise ValueError("Unknown URL: %s" % url) - def get(self, url, **kwargs): + def get(self, url, krb=False, **kwargs): session = self.get_session(url) - return session.get(url, allow_redirects=False, **kwargs) + allow_redirects = False + if krb: + # In at least the test instance we don't get back a negotiate + # blob to do mutual authentication against. + kerberos_auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL) + kwargs['auth'] = kerberos_auth + allow_redirects = True + return session.get(url, allow_redirects=allow_redirects, **kwargs) def post(self, url, **kwargs): session = self.get_session(url) return session.post(url, allow_redirects=False, **kwargs) - def access(self, action, url, **kwargs): + def access(self, action, url, krb=False, **kwargs): action = string.lower(action) if action == 'get': - return self.get(url, **kwargs) + return self.get(url, krb, **kwargs) elif action == 'post': return self.post(url, **kwargs) else: @@ -242,19 +250,39 @@ class HttpSessions(object): return [method, self.new_url(referer, action_url), {'headers': headers, 'data': payload}] - def fetch_page(self, idp, target_url, follow_redirect=True): + def fetch_page(self, idp, target_url, follow_redirect=True, krb=False): + """ + Fetch a page and parse the response code to determine what to do + next. + + The login process consists of redirections (302/303) and + potentially an unauthorized (401). For the case of unauthorized + try the page returned in case of fallback authentication. + """ url = target_url action = 'get' args = {} while True: - r = self.access(action, url, **args) # pylint: disable=star-args + # pylint: disable=star-args + r = self.access(action, url, krb=krb, **args) if r.status_code == 303 or r.status_code == 302: if not follow_redirect: return PageTree(r) url = r.headers['location'] action = 'get' args = {} + elif r.status_code == 401: + page = PageTree(r) + if r.headers.get('WWW-Authenticate', None) is None: + return page + + # Fall back, hopefully to testauth authentication. + try: + (action, url, args) = self.handle_login_form(idp, page) + continue + except WrongPage: + pass elif r.status_code == 200: page = PageTree(r) @@ -288,12 +316,12 @@ class HttpSessions(object): raise ValueError("Unhandled status (%d) on url %s" % ( r.status_code, url)) - def auth_to_idp(self, idp): + def auth_to_idp(self, idp, krb=False): srv = self.servers[idp] target_url = '%s/%s/' % (srv['baseuri'], idp) - r = self.access('get', target_url) + r = self.access('get', target_url, krb=krb) if r.status_code != 200: raise ValueError("Access to idp failed: %s" % repr(r)) @@ -302,7 +330,8 @@ class HttpSessions(object): href = page.first_value('//div[@id="content"]/p/a/@href') url = self.new_url(target_url, href) - page = self.fetch_page(idp, url) + page = self.fetch_page(idp, url, krb=krb) + page.expected_value('//div[@id="welcome"]/p/text()', 'Welcome %s!' % srv['user']) @@ -325,7 +354,6 @@ class HttpSessions(object): def add_sp_metadata(self, idp, sp, rest=False): expected_status = 200 - idpsrv = self.servers[idp] (idpuri, m) = self.get_sp_metadata(idp, sp) url = '%s/%s/admin/providers/saml2/admin/new' % (idpuri, idp) headers = {'referer': url} @@ -334,13 +362,11 @@ class HttpSessions(object): payload = {'metadata': m.content} headers['content-type'] = 'application/x-www-form-urlencoded' url = '%s/%s/rest/providers/saml2/SPS/%s' % (idpuri, idp, sp) - r = idpsrv['session'].post(url, headers=headers, - data=urlencode(payload)) + r = self.post(url, headers=headers, data=urlencode(payload)) else: metafile = {'metafile': m.content} payload = {'name': sp} - r = idpsrv['session'].post(url, headers=headers, - data=payload, files=metafile) + r = self.post(url, headers=headers, data=payload, files=metafile) if r.status_code != expected_status: raise ValueError('Failed to post SP data [%s]' % repr(r)) |