diff options
Diffstat (limited to 'python/tests/websimulator.py')
-rw-r--r-- | python/tests/websimulator.py | 161 |
1 files changed, 61 insertions, 100 deletions
diff --git a/python/tests/websimulator.py b/python/tests/websimulator.py index 65064a0a..0c923d05 100644 --- a/python/tests/websimulator.py +++ b/python/tests/websimulator.py @@ -1,12 +1,11 @@ # -*- coding: UTF-8 -*- -# Python Lasso Simulator +# Lasso Simulator +# By: Emmanuel Raviart <eraviart@entrouvert.com> # # Copyright (C) 2004 Entr'ouvert # http://lasso.entrouvert.org -# -# Author: Emmanuel Raviart <eraviart@entrouvert.com> # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -27,7 +26,10 @@ # FIXME: Rename webUser to userAccount. -class HttpRequest(object): +import abstractweb + + +class HttpRequest(abstractweb.HttpRequestMixin, object): client = None # Principal or web site sending the request. body = None form = None @@ -46,10 +48,6 @@ class HttpRequest(object): if form: self.form = form - def send(self): - webSite = self.client.internet.getWebSite(self.url) - return webSite.doHttpRequest(self) - def getFormField(self, name, default = 'none'): if self.form and name in self.form: return self.form[name] @@ -57,49 +55,51 @@ class HttpRequest(object): raise KeyError(name) return default - def getQueryBoolean(self, name, default = 'none'): - try: - fieldValue = self.getQueryField(name) - except KeyError: - if default == 'none': - raise - return default - return fieldValue.lower not in ('', '0', 'false') + def getPath(self): + return self.pathAndQuery.split('?', 1)[0] + + def getPathAndQuery(self): + urlWithoutScheme = self.url[self.url.find('://') + 3:] + if '/' in urlWithoutScheme: + pathAndQuery = urlWithoutScheme[urlWithoutScheme.find('/'):] + else: + pathAndQuery = '' + return pathAndQuery def getQuery(self): - splitedUrl = self.url.split('?', 1) + splitedUrl = self.pathAndQuery.split('?', 1) if len(splitedUrl) > 1: return splitedUrl[1] else: return '' - def getQueryField(self, name, default = 'none'): - if self.query: - for field in self.query.split('&'): - fieldName, fieldValue = field.split('=') - if name == fieldName: - return fieldValue - if default == 'none': - raise KeyError(name) - return default + def getScheme(self): + return self.url.split(':', 1)[0].lower() + def send(self): + webSite = self.client.internet.getWebSite(self.url) + return webSite.handleHttpRequest(self) + + path = property(getPath) + pathAndQuery = property(getPathAndQuery) query = property(getQuery) + scheme = property(getScheme) -class HttpResponse(object): - body = None - headers = None - statusCode = None # 200 or... - statusMessage = None +class HttpResponse(abstractweb.HttpResponseMixin, object): + def send(self, httpRequestHandler): + return self - def __init__(self, statusCode, statusMessage = None, headers = None, body = None): - self.statusCode = statusCode - if statusMessage: - self.statusMessage = statusMessage - if headers: - self.headers = headers - if body: - self.body = body + +class HttpRequestHandler(abstractweb.HttpRequestHandlerMixin, object): + HttpResponse = HttpResponse # Class + + def __init__(self, webServer, httpRequest): + self.webServer = webServer + self.httpRequest = httpRequest + + def respondRedirectTemporarily(self, url): + return self.httpRequest.client.redirect(url) class Internet(object): @@ -118,41 +118,10 @@ class Internet(object): raise Exception('Unknown web site: %s' % url) -class Simulation(object): - test = None # The testing instance - - def __init__(self, test): - self.test = test - - def fail(self, msg = None): - return self.test.fail(msg) - - def failIf(self, expr, msg = None): - return self.test.failIf(expr, msg) - - def failIfAlmostEqual(self, first, second, places = 7, msg = None): - return self.test.failIfAlmostEqual(first, second, places, msg) - - def failIfEqual(self, first, second, msg = None): - return self.test.failIfEqual(first, second, msg) - - def failUnless(self, expr, msg = None): - return self.test.failUnless(expr, msg) - - def failUnlessAlmostEqual(self, first, second, places = 7, msg = None): - return self.test.failUnlessAlmostEqual(first, second, places, msg) - - def failUnlessRaises(self, excClass, callableObj, *args, **kwargs): - return self.test.failUnlessRaises(self, excClass, callableObj, *args, **kwargs) - - def failUnlessEqual(self, first, second, msg = None): - return self.test.failUnlessEqual(first, second, msg) - - class WebClient(object): internet = None keyring = None - requestHeaders = { + httpRequestHeaders = { 'User-Agent': 'LassoSimulator/0.0.0', 'Accept': 'text/xml,application/xml,application/xhtml+xml,text/html', } @@ -168,15 +137,14 @@ class WebClient(object): return self.sendHttpRequest('GET', url) def sendHttpRequest(self, method, url, headers = None, body = None, form = None): - requestHeaders = self.requestHeaders if headers: - requestHeaders = self.requestHeaders.copy() + httpRequestHeaders = self.httpRequestHeaders.copy() for name, value in headers.iteritems(): - requestHeaders[name] = value + httpRequestHeaders[name] = value else: - requestHeaders = self.requestHeaders + httpRequestHeaders = self.httpRequestHeaders return HttpRequest( - self, method, url, headers = requestHeaders, body = body, form = form).send() + self, method, url, headers = httpRequestHeaders, body = body, form = form).send() def sendHttpRequestToSite(self, webSite, method, path, headers = None, body = None, form = None): @@ -225,20 +193,19 @@ class WebUser(object): self.uniqueId = uniqueId -class WebSite(WebClient, Simulation): +class WebSite(WebClient): """Simulation of a web site""" - lastWebSessionId = 0 - providerId = None # The Liberty providerID of this web site - responseHeaders = { + httpResponseHeaders = { 'Server': 'Lasso Simulator Web Server', } + lastWebSessionId = 0 + providerId = None # The Liberty providerID of this web site url = None # The main URL of web site webUsers = None webSessions = None - def __init__(self, test, internet, url): - Simulation.__init__(self, test) + def __init__(self, internet, url): WebClient.__init__(self, internet) self.url = url self.webUserIdsByNameIdentifier = {} @@ -257,14 +224,6 @@ class WebSite(WebClient, Simulation): client.webSessionIds[self.url] = self.lastWebSessionId return webSession - def doHttpRequest(self, httpRequest): - url = httpRequest.url - if url.startswith(self.url): - url = url[len(self.url):] - methodName = url.split('?', 1)[0].replace('/', '') - method = getattr(self, methodName) - return method(httpRequest) - def getIdentityDump(self, principal): webSession = self.getWebSession(principal) webUser = self.getWebUserFromWebSession(webSession) @@ -309,13 +268,15 @@ class WebSite(WebClient, Simulation): return None return self.webUsers.get(webUserId, None) - def newHttpResponse(self, statusCode, statusMessage = None, headers = None, body = None): - responseHeaders = self.responseHeaders - if headers: - responseHeaders = self.responseHeaders.copy() - for name, value in headers.iteritems(): - responseHeaders[name] = value - else: - responseHeaders = self.responseHeaders - return HttpResponse( - statusCode, statusMessage = statusMessage, headers = headers, body = body) + def handleHttpRequest(self, httpRequest): + httpRequestHandler = HttpRequestHandler(self, httpRequest) + return self.handleHttpRequestHandler(httpRequestHandler) + + def handleHttpRequestHandler(self, httpRequestHandler): + methodName = httpRequestHandler.httpRequest.path.replace('/', '') + try: + method = getattr(self, methodName) + except AttributeError: + return httpRequestHandler.respond( + 404, 'Path "%s" Not Found.' % httpRequestHandler.httpRequest.path) + return method(httpRequestHandler) |