summaryrefslogtreecommitdiffstats
path: root/python/tests/websimulator.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/tests/websimulator.py')
-rw-r--r--python/tests/websimulator.py161
1 files changed, 61 insertions, 100 deletions
diff --git a/python/tests/websimulator.py b/python/tests/websimulator.py
index 65064a0a..0c923d05 100644
--- a/python/tests/websimulator.py
+++ b/python/tests/websimulator.py
@@ -1,12 +1,11 @@
# -*- coding: UTF-8 -*-
-# Python Lasso Simulator
+# Lasso Simulator
+# By: Emmanuel Raviart <eraviart@entrouvert.com>
#
# Copyright (C) 2004 Entr'ouvert
# http://lasso.entrouvert.org
-#
-# Author: Emmanuel Raviart <eraviart@entrouvert.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -27,7 +26,10 @@
# FIXME: Rename webUser to userAccount.
-class HttpRequest(object):
+import abstractweb
+
+
+class HttpRequest(abstractweb.HttpRequestMixin, object):
client = None # Principal or web site sending the request.
body = None
form = None
@@ -46,10 +48,6 @@ class HttpRequest(object):
if form:
self.form = form
- def send(self):
- webSite = self.client.internet.getWebSite(self.url)
- return webSite.doHttpRequest(self)
-
def getFormField(self, name, default = 'none'):
if self.form and name in self.form:
return self.form[name]
@@ -57,49 +55,51 @@ class HttpRequest(object):
raise KeyError(name)
return default
- def getQueryBoolean(self, name, default = 'none'):
- try:
- fieldValue = self.getQueryField(name)
- except KeyError:
- if default == 'none':
- raise
- return default
- return fieldValue.lower not in ('', '0', 'false')
+ def getPath(self):
+ return self.pathAndQuery.split('?', 1)[0]
+
+ def getPathAndQuery(self):
+ urlWithoutScheme = self.url[self.url.find('://') + 3:]
+ if '/' in urlWithoutScheme:
+ pathAndQuery = urlWithoutScheme[urlWithoutScheme.find('/'):]
+ else:
+ pathAndQuery = ''
+ return pathAndQuery
def getQuery(self):
- splitedUrl = self.url.split('?', 1)
+ splitedUrl = self.pathAndQuery.split('?', 1)
if len(splitedUrl) > 1:
return splitedUrl[1]
else:
return ''
- def getQueryField(self, name, default = 'none'):
- if self.query:
- for field in self.query.split('&'):
- fieldName, fieldValue = field.split('=')
- if name == fieldName:
- return fieldValue
- if default == 'none':
- raise KeyError(name)
- return default
+ def getScheme(self):
+ return self.url.split(':', 1)[0].lower()
+ def send(self):
+ webSite = self.client.internet.getWebSite(self.url)
+ return webSite.handleHttpRequest(self)
+
+ path = property(getPath)
+ pathAndQuery = property(getPathAndQuery)
query = property(getQuery)
+ scheme = property(getScheme)
-class HttpResponse(object):
- body = None
- headers = None
- statusCode = None # 200 or...
- statusMessage = None
+class HttpResponse(abstractweb.HttpResponseMixin, object):
+ def send(self, httpRequestHandler):
+ return self
- def __init__(self, statusCode, statusMessage = None, headers = None, body = None):
- self.statusCode = statusCode
- if statusMessage:
- self.statusMessage = statusMessage
- if headers:
- self.headers = headers
- if body:
- self.body = body
+
+class HttpRequestHandler(abstractweb.HttpRequestHandlerMixin, object):
+ HttpResponse = HttpResponse # Class
+
+ def __init__(self, webServer, httpRequest):
+ self.webServer = webServer
+ self.httpRequest = httpRequest
+
+ def respondRedirectTemporarily(self, url):
+ return self.httpRequest.client.redirect(url)
class Internet(object):
@@ -118,41 +118,10 @@ class Internet(object):
raise Exception('Unknown web site: %s' % url)
-class Simulation(object):
- test = None # The testing instance
-
- def __init__(self, test):
- self.test = test
-
- def fail(self, msg = None):
- return self.test.fail(msg)
-
- def failIf(self, expr, msg = None):
- return self.test.failIf(expr, msg)
-
- def failIfAlmostEqual(self, first, second, places = 7, msg = None):
- return self.test.failIfAlmostEqual(first, second, places, msg)
-
- def failIfEqual(self, first, second, msg = None):
- return self.test.failIfEqual(first, second, msg)
-
- def failUnless(self, expr, msg = None):
- return self.test.failUnless(expr, msg)
-
- def failUnlessAlmostEqual(self, first, second, places = 7, msg = None):
- return self.test.failUnlessAlmostEqual(first, second, places, msg)
-
- def failUnlessRaises(self, excClass, callableObj, *args, **kwargs):
- return self.test.failUnlessRaises(self, excClass, callableObj, *args, **kwargs)
-
- def failUnlessEqual(self, first, second, msg = None):
- return self.test.failUnlessEqual(first, second, msg)
-
-
class WebClient(object):
internet = None
keyring = None
- requestHeaders = {
+ httpRequestHeaders = {
'User-Agent': 'LassoSimulator/0.0.0',
'Accept': 'text/xml,application/xml,application/xhtml+xml,text/html',
}
@@ -168,15 +137,14 @@ class WebClient(object):
return self.sendHttpRequest('GET', url)
def sendHttpRequest(self, method, url, headers = None, body = None, form = None):
- requestHeaders = self.requestHeaders
if headers:
- requestHeaders = self.requestHeaders.copy()
+ httpRequestHeaders = self.httpRequestHeaders.copy()
for name, value in headers.iteritems():
- requestHeaders[name] = value
+ httpRequestHeaders[name] = value
else:
- requestHeaders = self.requestHeaders
+ httpRequestHeaders = self.httpRequestHeaders
return HttpRequest(
- self, method, url, headers = requestHeaders, body = body, form = form).send()
+ self, method, url, headers = httpRequestHeaders, body = body, form = form).send()
def sendHttpRequestToSite(self, webSite, method, path, headers = None, body = None,
form = None):
@@ -225,20 +193,19 @@ class WebUser(object):
self.uniqueId = uniqueId
-class WebSite(WebClient, Simulation):
+class WebSite(WebClient):
"""Simulation of a web site"""
- lastWebSessionId = 0
- providerId = None # The Liberty providerID of this web site
- responseHeaders = {
+ httpResponseHeaders = {
'Server': 'Lasso Simulator Web Server',
}
+ lastWebSessionId = 0
+ providerId = None # The Liberty providerID of this web site
url = None # The main URL of web site
webUsers = None
webSessions = None
- def __init__(self, test, internet, url):
- Simulation.__init__(self, test)
+ def __init__(self, internet, url):
WebClient.__init__(self, internet)
self.url = url
self.webUserIdsByNameIdentifier = {}
@@ -257,14 +224,6 @@ class WebSite(WebClient, Simulation):
client.webSessionIds[self.url] = self.lastWebSessionId
return webSession
- def doHttpRequest(self, httpRequest):
- url = httpRequest.url
- if url.startswith(self.url):
- url = url[len(self.url):]
- methodName = url.split('?', 1)[0].replace('/', '')
- method = getattr(self, methodName)
- return method(httpRequest)
-
def getIdentityDump(self, principal):
webSession = self.getWebSession(principal)
webUser = self.getWebUserFromWebSession(webSession)
@@ -309,13 +268,15 @@ class WebSite(WebClient, Simulation):
return None
return self.webUsers.get(webUserId, None)
- def newHttpResponse(self, statusCode, statusMessage = None, headers = None, body = None):
- responseHeaders = self.responseHeaders
- if headers:
- responseHeaders = self.responseHeaders.copy()
- for name, value in headers.iteritems():
- responseHeaders[name] = value
- else:
- responseHeaders = self.responseHeaders
- return HttpResponse(
- statusCode, statusMessage = statusMessage, headers = headers, body = body)
+ def handleHttpRequest(self, httpRequest):
+ httpRequestHandler = HttpRequestHandler(self, httpRequest)
+ return self.handleHttpRequestHandler(httpRequestHandler)
+
+ def handleHttpRequestHandler(self, httpRequestHandler):
+ methodName = httpRequestHandler.httpRequest.path.replace('/', '')
+ try:
+ method = getattr(self, methodName)
+ except AttributeError:
+ return httpRequestHandler.respond(
+ 404, 'Path "%s" Not Found.' % httpRequestHandler.httpRequest.path)
+ return method(httpRequestHandler)