summaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
authorSimo Sorce <simo@redhat.com>2014-06-15 17:46:47 -0400
committerSimo Sorce <simo@redhat.com>2014-06-15 18:04:00 -0400
commit9a651e44ce34e092708b9a0df40de3f112132199 (patch)
tree74f592f5de8c10f049e18be85b48f629008dcc5c /tests
parentf9abb07cc479008c6ee7f72e0c911768c7c9bd82 (diff)
downloadipsilon-9a651e44ce34e092708b9a0df40de3f112132199.tar.gz
ipsilon-9a651e44ce34e092708b9a0df40de3f112132199.tar.xz
ipsilon-9a651e44ce34e092708b9a0df40de3f112132199.zip
Move parsing code into helpers module
This way common test actions can be easily reused by multiple tests. Signed-off-by: Simo Sorce <simo@redhat.com>
Diffstat (limited to 'tests')
-rw-r--r--tests/helpers/__init__.py0
-rwxr-xr-xtests/helpers/http.py241
-rwxr-xr-xtests/test1.py243
3 files changed, 258 insertions, 226 deletions
diff --git a/tests/helpers/__init__.py b/tests/helpers/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/helpers/__init__.py
diff --git a/tests/helpers/http.py b/tests/helpers/http.py
new file mode 100755
index 0000000..425d6b7
--- /dev/null
+++ b/tests/helpers/http.py
@@ -0,0 +1,241 @@
+#!/usr/bin/python
+#
+# Copyright (C) 2014 Simo Sorce <simo@redhat.com>
+#
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+from lxml import html
+import requests
+import string
+import urlparse
+
+
+class WrongPage(Exception):
+ pass
+
+
+class PageTree(object):
+
+ def __init__(self, result):
+ self.result = result
+ self.text = result.text
+ self._tree = None
+
+ @property
+ def tree(self):
+ if not self._tree:
+ self._tree = html.fromstring(self.text)
+ return self._tree
+
+ def first_value(self, rule):
+ result = self.tree.xpath(rule)
+ if type(result) is list:
+ if len(result) > 0:
+ result = result[0]
+ else:
+ result = None
+ return result
+
+ def all_values(self, rule):
+ result = self.tree.xpath(rule)
+ if type(result) is list:
+ return result
+ return [result]
+
+ def make_referer(self):
+ return self.result.url
+
+ def expected_value(self, rule, expected):
+ value = self.first_value(rule)
+ if value != expected:
+ raise ValueError("Expected [%s], got [%s]" % (expected, value))
+
+
+class HttpSessions(object):
+
+ def __init__(self):
+ self.servers = dict()
+
+ def add_server(self, name, baseuri, user=None, pwd=None):
+ new = {'baseuri': baseuri,
+ 'session': requests.Session()}
+ if user:
+ new['user'] = user
+ if pwd:
+ new['pwd'] = pwd
+ self.servers[name] = new
+
+ def get_session(self, url):
+ for srv in self.servers:
+ d = self.servers[srv]
+ if url.startswith(d['baseuri']):
+ return d['session']
+
+ raise ValueError("Unknown URL: %s" % url)
+
+ def get(self, url, **kwargs):
+ session = self.get_session(url)
+ return session.get(url, allow_redirects=False, **kwargs)
+
+ def post(self, url, **kwargs):
+ session = self.get_session(url)
+ return session.post(url, allow_redirects=False, **kwargs)
+
+ def access(self, action, url, **kwargs):
+ action = string.lower(action)
+ if action == 'get':
+ return self.get(url, **kwargs)
+ elif action == 'post':
+ return self.post(url, **kwargs)
+ else:
+ raise ValueError("Unknown action type: [%s]" % action)
+
+ def new_url(self, referer, action):
+ if action.startswith('/'):
+ u = urlparse.urlparse(referer)
+ return '%s://%s%s' % (u.scheme, u.netloc, action)
+ return action
+
+ def get_form_data(self, page, form_id, input_fields):
+ values = []
+ action = page.first_value('//form[@id="%s"]/@action' % form_id)
+ values.append(action)
+ method = page.first_value('//form[@id="%s"]/@method' % form_id)
+ values.append(method)
+ for field in input_fields:
+ value = page.all_values('//form[@id="%s"]/input/@%s' % (form_id,
+ field))
+ values.append(value)
+ return values
+
+ def handle_login_form(self, idp, page):
+ if type(page) != PageTree:
+ raise TypeError("Expected PageTree object")
+
+ srv = self.servers[idp]
+
+ try:
+ results = self.get_form_data(page, "login_form", [])
+ action_url = results[0]
+ method = results[1]
+ if action_url is None:
+ raise Exception
+ except Exception: # pylint: disable=broad-except
+ raise WrongPage("Not a Login Form Page")
+
+ referer = page.make_referer()
+ headers = {'referer': referer}
+ payload = {'login_name': srv['user'],
+ 'login_password': srv['pwd']}
+
+ return [method, self.new_url(referer, action_url),
+ {'headers': headers, 'data': payload}]
+
+ def handle_return_form(self, page):
+ if type(page) != PageTree:
+ raise TypeError("Expected PageTree object")
+
+ try:
+ results = self.get_form_data(page, "saml-response",
+ ["name", "value"])
+ action_url = results[0]
+ if action_url is None:
+ raise Exception
+ method = results[1]
+ names = results[2]
+ values = results[3]
+ except Exception: # pylint: disable=broad-except
+ raise WrongPage("Not a Return Form Page")
+
+ referer = page.make_referer()
+ headers = {'referer': referer}
+
+ payload = {}
+ for i in range(0, len(names)):
+ payload[names[i]] = values[i]
+
+ return [method, self.new_url(referer, action_url),
+ {'headers': headers, 'data': payload}]
+
+ def fetch_page(self, idp, target_url):
+ url = target_url
+ action = 'get'
+ args = {}
+
+ while True:
+ r = self.access(action, url, **args) # pylint: disable=star-args
+ if r.status_code == 303:
+ url = r.headers['location']
+ action = 'get'
+ args = {}
+ elif r.status_code == 200:
+ page = PageTree(r)
+
+ try:
+ (action, url, args) = self.handle_login_form(idp, page)
+ continue
+ except WrongPage:
+ pass
+
+ try:
+ (action, url, args) = self.handle_return_form(page)
+ continue
+ except WrongPage:
+ pass
+
+ # Either we got what we wanted, or we have to stop anyway
+ return page
+ else:
+ raise ValueError("Unhandled status (%d) on url %s" % (
+ r.status_code, url))
+
+ def auth_to_idp(self, idp):
+
+ srv = self.servers[idp]
+ target_url = '%s/%s/' % (srv['baseuri'], idp)
+
+ r = self.access('get', target_url)
+ if r.status_code != 200:
+ raise ValueError("Access to idp failed: %s" % repr(r))
+
+ page = PageTree(r)
+ page.expected_value('//div[@id="content"]/p/a/text()', 'Log In')
+ href = page.first_value('//div[@id="content"]/p/a/@href')
+ url = self.new_url(target_url, href)
+
+ page = self.fetch_page(idp, url)
+ page.expected_value('//div[@id="welcome"]/p/text()',
+ 'Welcome %s!' % srv['user'])
+
+ def add_sp_metadata(self, idp, sp):
+ idpsrv = self.servers[idp]
+ idpuri = idpsrv['baseuri']
+ spuri = self.servers[sp]['baseuri']
+
+ url = '%s/%s/admin/providers/saml2/admin/new' % (idpuri, idp)
+ headers = {'referer': url}
+ payload = {'name': sp}
+ m = requests.get('%s/saml2/metadata' % spuri)
+ metafile = {'metafile': m.content}
+ r = idpsrv['session'].post(url, headers=headers,
+ data=payload, files=metafile)
+ if r.status_code != 200:
+ raise ValueError('Failed to post SP data [%s]' % repr(r))
+
+ page = PageTree(r)
+ page.expected_value('//div[@class="alert alert-success"]/p/text()',
+ 'SP Successfully added')
diff --git a/tests/test1.py b/tests/test1.py
index 6e76f88..411ac6e 100755
--- a/tests/test1.py
+++ b/tests/test1.py
@@ -18,253 +18,44 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-from lxml import html
+from helpers import http # pylint: disable=relative-import
import os
import pwd
-import requests
import sys
-import urlparse
-def get_session(srvs, url):
- for srv in srvs:
- if url.startswith(srv['baseuri']):
- return srv['session']
-
- raise ValueError("Unknown URL: %s" % url)
-
-
-def get_url(srvs, url, **kwargs):
- session = get_session(srvs, url)
- return session.get(url, allow_redirects=False, **kwargs)
-
-
-def post_url(srvs, url, **kwargs):
- session = get_session(srvs, url)
- return session.post(url, allow_redirects=False, **kwargs)
-
-
-def access_url(action, srvs, url, **kwargs):
- if action == 'get':
- return get_url(srvs, url, **kwargs)
- elif action == 'post':
- return post_url(srvs, url, **kwargs)
- else:
- raise ValueError("Unknown action type: [%s]" % action)
-
-
-def get_new_url(referer, action):
- if action.startswith('/'):
- u = urlparse.urlparse(referer)
- return '%s://%s%s' % (u.scheme, u.netloc, action)
- return action
-
-
-def parse_first(tree, rule):
- result = tree.xpath(rule)
- if type(result) is list:
- if len(result) > 0:
- result = result[0]
- else:
- result = None
- return result
-
-
-def parse_list(tree, rule):
- result = tree.xpath(rule)
- if type(result) is list:
- return result
- return [result]
-
-
-def handle_login_form(idp, r):
- tree = html.fromstring(r.text)
- try:
- action_url = parse_first(tree, '//form[@id="login_form"]/@action')
- method = parse_first(tree, '//form[@id="login_form"]/@method')
- except Exception: # pylint: disable=broad-except
- return []
-
- if action_url is None:
- return []
-
- headers = {'referer': r.url}
- payload = {'login_name': idp['user'],
- 'login_password': idp['pass']}
-
- return [method,
- get_new_url(r.url, action_url),
- {'headers': headers, 'data': payload}]
-
-
-def handle_return_form(r):
- tree = html.fromstring(r.text)
- try:
- action_url = parse_first(tree, '//form[@id="saml-response"]/@action')
- method = parse_first(tree, '//form[@id="saml-response"]/@method')
- names = parse_list(tree, '//form[@id="saml-response"]/input/@name')
- values = parse_list(tree, '//form[@id="saml-response"]/input/@value')
- except Exception: # pylint: disable=broad-except
- return []
-
- if action_url is None:
- return []
-
- headers = {'referer': r.url}
- payload = {}
- for i in range(0, len(names)):
- payload[names[i]] = values[i]
-
- return [method,
- get_new_url(r.url, action_url),
- {'headers': headers, 'data': payload}]
-
-
-def go_to_url(srvs, idp, start_url, target_url):
-
- url = start_url
- action = 'get'
- args = {}
-
- good = True
- while good:
- r = access_url(action, srvs, url, **args) # pylint: disable=star-args
- if r.status_code == 303:
- url = r.headers['location']
- action = 'get'
- args = {}
- elif r.status_code == 200:
- if url == target_url:
- return r.text
-
- result = handle_login_form(idp, r)
- if result:
- action = result[0]
- url = result[1]
- args = result[2]
- continue
-
- result = handle_return_form(r)
- if result:
- action = result[0]
- url = result[1]
- args = result[2]
- continue
-
- raise ValueError("Unhandled Success code at url %s" % url)
-
- else:
- good = False
-
- raise ValueError("Unhandled status (%d) on url %s" % (r.status_code, url))
-
-
-def auth_to_idp(idp):
-
- target_url = '%s/%s/' % (idp['baseuri'], idp['name'])
- srvs = [idp]
+if __name__ == '__main__':
+ basedir = sys.argv[1]
- r = access_url('get', srvs, target_url)
- if r.status_code != 200:
- print >> sys.stderr, " ERROR: Access to idp failed: %s" % repr(r)
- return False
+ idpname = 'idp1'
+ spname = 'sp1'
+ user = pwd.getpwuid(os.getuid())[0]
- tree = html.fromstring(r.text)
- try:
- expected = 'Log In'
- login = parse_first(tree, '//div[@id="content"]/p/a/text()')
- if login != expected:
- print >> sys.stderr, " ERROR: Expected [%s] got [%s]" % (expected,
- login)
- href = parse_first(tree, '//div[@id="content"]/p/a/@href')
- start_url = get_new_url(target_url, href)
- except Exception, e: # pylint: disable=broad-except
- print >> sys.stderr, " ERROR: Unexpected reply [%s]" % repr(e)
- return False
+ sess = http.HttpSessions()
+ sess.add_server(idpname, 'http://127.0.0.10:45080', user, 'ipsilon')
+ sess.add_server(spname, 'http://127.0.0.11:45081')
+ print "test1: Authenticate to IDP ...",
try:
- page = go_to_url(srvs, idp, start_url, target_url)
+ sess.auth_to_idp(idpname)
except Exception, e: # pylint: disable=broad-except
print >> sys.stderr, " ERROR: %s" % repr(e)
- return False
-
- tree = html.fromstring(page)
- try:
- welcome = parse_first(tree, '//div[@id="welcome"]/p/text()')
- except Exception, e: # pylint: disable=broad-except
- print >> sys.stderr, " ERROR: Unexpected reply [%s]" % repr(e)
- return False
-
- expected = 'Welcome %s!' % idp['user']
- if welcome != expected:
- print >> sys.stderr, " ERROR: Expected [%s] got [%s]" % (expected,
- welcome)
- return False
-
- return True
-
-
-def add_sp_metadata(idp, sp):
- url = '%s/%s/admin/providers/saml2/admin/new' % (idp['baseuri'],
- idp['name'])
- headers = {'referer': url}
- payload = {'name': sp['name']}
- m = requests.get('%s/saml2/metadata' % sp['baseuri'])
- metafile = {'metafile': m.content}
- r = idp['session'].post(url, headers=headers,
- data=payload, files=metafile)
- if r.status_code != 200:
- print >> sys.stderr, " ERROR: %s" % repr(r)
- return False
-
- tree = html.fromstring(r.text)
- try:
- alert = parse_first(tree,
- '//div[@class="alert alert-success"]/p/text()')
- except Exception, e: # pylint: disable=broad-except
- print >> sys.stderr, " ERROR: Unexpected reply [%s]" % repr(e)
- return False
-
- expected = 'SP Successfully added'
- if alert != expected:
- print >> sys.stderr, " ERROR: Expected [%s] got [%s]" % (expected,
- alert)
- return False
-
- return True
-
-
-if __name__ == '__main__':
- basedir = sys.argv[1]
-
- idpsrv = {'name': 'idp1',
- 'baseuri': 'http://127.0.0.10:45080',
- 'session': requests.Session(),
- 'user': pwd.getpwuid(os.getuid())[0],
- 'pass': 'ipsilon'}
- spsrv = {'name': 'sp1',
- 'baseuri': 'http://127.0.0.11:45081',
- 'session': requests.Session()}
-
- print "test1: Authenticate to IDP ...",
- if not auth_to_idp(idpsrv):
sys.exit(1)
print " SUCCESS"
print "test1: Add SP Metadata to IDP ...",
- if not add_sp_metadata(idpsrv, spsrv):
+ try:
+ sess.add_sp_metadata(idpname, spname)
+ except Exception, e: # pylint: disable=broad-except
+ print >> sys.stderr, " ERROR: %s" % repr(e)
sys.exit(1)
print " SUCCESS"
print "test1: Access SP Protected Area ...",
- servers = [idpsrv, spsrv]
- spurl = '%s/sp/' % (spsrv['baseuri'])
try:
- text = go_to_url(servers, idpsrv, spurl, spurl)
+ page = sess.fetch_page(idpname, 'http://127.0.0.11:45081/sp/')
+ page.expected_value('text()', 'WORKS!')
except ValueError, e:
print >> sys.stderr, " ERROR: %s" % repr(e)
sys.exit(1)
- if text != "WORKS!":
- print >> sys.stderr, "ERROR: Expected [WORKS!], got [%s]" % text
- sys.exit(1)
print " SUCCESS"