From 9a651e44ce34e092708b9a0df40de3f112132199 Mon Sep 17 00:00:00 2001 From: Simo Sorce Date: Sun, 15 Jun 2014 17:46:47 -0400 Subject: Move parsing code into helpers module This way common test actions can be easily reused by multiple tests. Signed-off-by: Simo Sorce --- tests/test1.py | 243 ++++----------------------------------------------------- 1 file changed, 17 insertions(+), 226 deletions(-) (limited to 'tests/test1.py') diff --git a/tests/test1.py b/tests/test1.py index 6e76f88..411ac6e 100755 --- a/tests/test1.py +++ b/tests/test1.py @@ -18,253 +18,44 @@ # along with this program. If not, see . -from lxml import html +from helpers import http # pylint: disable=relative-import import os import pwd -import requests import sys -import urlparse -def get_session(srvs, url): - for srv in srvs: - if url.startswith(srv['baseuri']): - return srv['session'] - - raise ValueError("Unknown URL: %s" % url) - - -def get_url(srvs, url, **kwargs): - session = get_session(srvs, url) - return session.get(url, allow_redirects=False, **kwargs) - - -def post_url(srvs, url, **kwargs): - session = get_session(srvs, url) - return session.post(url, allow_redirects=False, **kwargs) - - -def access_url(action, srvs, url, **kwargs): - if action == 'get': - return get_url(srvs, url, **kwargs) - elif action == 'post': - return post_url(srvs, url, **kwargs) - else: - raise ValueError("Unknown action type: [%s]" % action) - - -def get_new_url(referer, action): - if action.startswith('/'): - u = urlparse.urlparse(referer) - return '%s://%s%s' % (u.scheme, u.netloc, action) - return action - - -def parse_first(tree, rule): - result = tree.xpath(rule) - if type(result) is list: - if len(result) > 0: - result = result[0] - else: - result = None - return result - - -def parse_list(tree, rule): - result = tree.xpath(rule) - if type(result) is list: - return result - return [result] - - -def handle_login_form(idp, r): - tree = html.fromstring(r.text) - try: - action_url = parse_first(tree, '//form[@id="login_form"]/@action') - method = parse_first(tree, '//form[@id="login_form"]/@method') - except Exception: # pylint: disable=broad-except - return [] - - if action_url is None: - return [] - - headers = {'referer': r.url} - payload = {'login_name': idp['user'], - 'login_password': idp['pass']} - - return [method, - get_new_url(r.url, action_url), - {'headers': headers, 'data': payload}] - - -def handle_return_form(r): - tree = html.fromstring(r.text) - try: - action_url = parse_first(tree, '//form[@id="saml-response"]/@action') - method = parse_first(tree, '//form[@id="saml-response"]/@method') - names = parse_list(tree, '//form[@id="saml-response"]/input/@name') - values = parse_list(tree, '//form[@id="saml-response"]/input/@value') - except Exception: # pylint: disable=broad-except - return [] - - if action_url is None: - return [] - - headers = {'referer': r.url} - payload = {} - for i in range(0, len(names)): - payload[names[i]] = values[i] - - return [method, - get_new_url(r.url, action_url), - {'headers': headers, 'data': payload}] - - -def go_to_url(srvs, idp, start_url, target_url): - - url = start_url - action = 'get' - args = {} - - good = True - while good: - r = access_url(action, srvs, url, **args) # pylint: disable=star-args - if r.status_code == 303: - url = r.headers['location'] - action = 'get' - args = {} - elif r.status_code == 200: - if url == target_url: - return r.text - - result = handle_login_form(idp, r) - if result: - action = result[0] - url = result[1] - args = result[2] - continue - - result = handle_return_form(r) - if result: - action = result[0] - url = result[1] - args = result[2] - continue - - raise ValueError("Unhandled Success code at url %s" % url) - - else: - good = False - - raise ValueError("Unhandled status (%d) on url %s" % (r.status_code, url)) - - -def auth_to_idp(idp): - - target_url = '%s/%s/' % (idp['baseuri'], idp['name']) - srvs = [idp] +if __name__ == '__main__': + basedir = sys.argv[1] - r = access_url('get', srvs, target_url) - if r.status_code != 200: - print >> sys.stderr, " ERROR: Access to idp failed: %s" % repr(r) - return False + idpname = 'idp1' + spname = 'sp1' + user = pwd.getpwuid(os.getuid())[0] - tree = html.fromstring(r.text) - try: - expected = 'Log In' - login = parse_first(tree, '//div[@id="content"]/p/a/text()') - if login != expected: - print >> sys.stderr, " ERROR: Expected [%s] got [%s]" % (expected, - login) - href = parse_first(tree, '//div[@id="content"]/p/a/@href') - start_url = get_new_url(target_url, href) - except Exception, e: # pylint: disable=broad-except - print >> sys.stderr, " ERROR: Unexpected reply [%s]" % repr(e) - return False + sess = http.HttpSessions() + sess.add_server(idpname, 'http://127.0.0.10:45080', user, 'ipsilon') + sess.add_server(spname, 'http://127.0.0.11:45081') + print "test1: Authenticate to IDP ...", try: - page = go_to_url(srvs, idp, start_url, target_url) + sess.auth_to_idp(idpname) except Exception, e: # pylint: disable=broad-except print >> sys.stderr, " ERROR: %s" % repr(e) - return False - - tree = html.fromstring(page) - try: - welcome = parse_first(tree, '//div[@id="welcome"]/p/text()') - except Exception, e: # pylint: disable=broad-except - print >> sys.stderr, " ERROR: Unexpected reply [%s]" % repr(e) - return False - - expected = 'Welcome %s!' % idp['user'] - if welcome != expected: - print >> sys.stderr, " ERROR: Expected [%s] got [%s]" % (expected, - welcome) - return False - - return True - - -def add_sp_metadata(idp, sp): - url = '%s/%s/admin/providers/saml2/admin/new' % (idp['baseuri'], - idp['name']) - headers = {'referer': url} - payload = {'name': sp['name']} - m = requests.get('%s/saml2/metadata' % sp['baseuri']) - metafile = {'metafile': m.content} - r = idp['session'].post(url, headers=headers, - data=payload, files=metafile) - if r.status_code != 200: - print >> sys.stderr, " ERROR: %s" % repr(r) - return False - - tree = html.fromstring(r.text) - try: - alert = parse_first(tree, - '//div[@class="alert alert-success"]/p/text()') - except Exception, e: # pylint: disable=broad-except - print >> sys.stderr, " ERROR: Unexpected reply [%s]" % repr(e) - return False - - expected = 'SP Successfully added' - if alert != expected: - print >> sys.stderr, " ERROR: Expected [%s] got [%s]" % (expected, - alert) - return False - - return True - - -if __name__ == '__main__': - basedir = sys.argv[1] - - idpsrv = {'name': 'idp1', - 'baseuri': 'http://127.0.0.10:45080', - 'session': requests.Session(), - 'user': pwd.getpwuid(os.getuid())[0], - 'pass': 'ipsilon'} - spsrv = {'name': 'sp1', - 'baseuri': 'http://127.0.0.11:45081', - 'session': requests.Session()} - - print "test1: Authenticate to IDP ...", - if not auth_to_idp(idpsrv): sys.exit(1) print " SUCCESS" print "test1: Add SP Metadata to IDP ...", - if not add_sp_metadata(idpsrv, spsrv): + try: + sess.add_sp_metadata(idpname, spname) + except Exception, e: # pylint: disable=broad-except + print >> sys.stderr, " ERROR: %s" % repr(e) sys.exit(1) print " SUCCESS" print "test1: Access SP Protected Area ...", - servers = [idpsrv, spsrv] - spurl = '%s/sp/' % (spsrv['baseuri']) try: - text = go_to_url(servers, idpsrv, spurl, spurl) + page = sess.fetch_page(idpname, 'http://127.0.0.11:45081/sp/') + page.expected_value('text()', 'WORKS!') except ValueError, e: print >> sys.stderr, " ERROR: %s" % repr(e) sys.exit(1) - if text != "WORKS!": - print >> sys.stderr, "ERROR: Expected [WORKS!], got [%s]" % text - sys.exit(1) print " SUCCESS" -- cgit