diff options
-rwxr-xr-x | ipa-client/ipa-install/ipa-client-install | 5 | ||||
-rw-r--r-- | ipalib/rpc.py | 280 | ||||
-rw-r--r-- | ipalib/session.py | 46 | ||||
-rw-r--r-- | ipapython/cookie.py | 699 | ||||
-rw-r--r-- | ipaserver/rpcserver.py | 6 | ||||
-rwxr-xr-x | make-lint | 4 | ||||
-rw-r--r-- | tests/test_ipapython/test_cookie.py | 475 |
7 files changed, 1435 insertions, 80 deletions
diff --git a/ipa-client/ipa-install/ipa-client-install b/ipa-client/ipa-install/ipa-client-install index 7f50b2a2f..7c3290298 100755 --- a/ipa-client/ipa-install/ipa-client-install +++ b/ipa-client/ipa-install/ipa-client-install @@ -42,8 +42,7 @@ try: from ipalib import api, errors from ipapython.dn import DN from ipapython.ssh import SSHPublicKey - from ipapython import kernel_keyring - from ipalib.rpc import COOKIE_NAME + from ipalib.rpc import delete_persistent_client_session_data import SSSDConfig from ConfigParser import RawConfigParser from optparse import SUPPRESS_HELP, OptionGroup @@ -1741,7 +1740,7 @@ def install(options, env, fstore, statestore): # Clear out any current session keyring information try: - kernel_keyring.del_key(COOKIE_NAME % host_principal) + delete_persistent_client_session_data(host_principal) except ValueError: pass diff --git a/ipalib/rpc.py b/ipalib/rpc.py index c555105f6..a5c5de557 100644 --- a/ipalib/rpc.py +++ b/ipalib/rpc.py @@ -37,6 +37,7 @@ import sys import os import errno import locale +import datetime from xmlrpclib import (Binary, Fault, dumps, loads, ServerProxy, Transport, ProtocolError, MININT, MAXINT) import kerberos @@ -48,8 +49,10 @@ from ipalib.errors import public_errors, PublicError, UnknownError, NetworkError from ipalib import errors from ipalib.request import context, Connection from ipalib.util import get_current_principal +from ipapython.ipa_log_manager import root_logger from ipapython import ipautil from ipapython import kernel_keyring +from ipapython.cookie import Cookie from ipalib.text import _ import httplib @@ -61,7 +64,65 @@ from ipalib.krb_utils import KRB5KDC_ERR_S_PRINCIPAL_UNKNOWN, KRB5KRB_AP_ERR_TKT KRB5_FCC_PERM, KRB5_FCC_NOFILE, KRB5_CC_FORMAT, KRB5_REALM_CANT_RESOLVE from ipapython.dn import DN -COOKIE_NAME = 'ipa_session_cookie:%s' +COOKIE_NAME = 'ipa_session' +KEYRING_COOKIE_NAME = '%s_cookie:%%s' % COOKIE_NAME + + +def client_session_keyring_keyname(principal): + ''' + Return the key name used for storing the client session data for + the given principal. + ''' + + return KEYRING_COOKIE_NAME % principal + +def update_persistent_client_session_data(principal, data): + ''' + Given a principal create or update the session data for that + principal in the persistent secure storage. + + Raises ValueError if unable to perform the action for any reason. + ''' + + try: + keyname = client_session_keyring_keyname(principal) + except Exception, e: + raise ValueError(str(e)) + + # kernel_keyring only raises ValueError (why??) + kernel_keyring.update_key(keyname, data) + +def read_persistent_client_session_data(principal): + ''' + Given a principal return the stored session data for that + principal from the persistent secure storage. + + Raises ValueError if unable to perform the action for any reason. + ''' + + try: + keyname = client_session_keyring_keyname(principal) + except Exception, e: + raise ValueError(str(e)) + + # kernel_keyring only raises ValueError (why??) + return kernel_keyring.read_key(keyname) + +def delete_persistent_client_session_data(principal): + ''' + Given a principal remove the session data for that + principal from the persistent secure storage. + + Raises ValueError if unable to perform the action for any reason. + ''' + + try: + keyname = client_session_keyring_keyname(principal) + except Exception, e: + raise ValueError(str(e)) + + # kernel_keyring only raises ValueError (why??) + kernel_keyring.del_key(keyname) def xml_wrap(value): """ @@ -310,9 +371,9 @@ class KerbTransport(SSLTransport): if not isinstance(extra_headers, list): extra_headers = [] - session_data = getattr(context, 'session_data', None) - if session_data: - extra_headers.append(('Cookie', session_data)) + session_cookie = getattr(context, 'session_cookie', None) + if session_cookie: + extra_headers.append(('Cookie', session_cookie)) return (host, extra_headers, x509) # Set the remote host principal @@ -345,16 +406,55 @@ class KerbTransport(SSLTransport): finally: self.close() + def store_session_cookie(self, cookie_header): + ''' + Given the contents of a Set-Cookie header scan the header and + extract each cookie contained within until the session cookie + is located. Examine the session cookie if the domain and path + are specified, if not update the cookie with those values from + the request URL. Then write the session cookie into the key + store for the principal. If the cookie header is None or the + session cookie is not present in the header no action is + taken. + + Context Dependencies: + + The per thread context is expected to contain: + principal + The current pricipal the HTTP request was issued for. + request_url + The URL of the HTTP request. + + ''' + + if cookie_header is None: + return + + principal = getattr(context, 'principal', None) + request_url = getattr(context, 'request_url', None) + root_logger.debug("received Set-Cookie '%s'", cookie_header) + + # Search for the session cookie + try: + session_cookie = Cookie.get_named_cookie_from_string(cookie_header, + COOKIE_NAME, request_url) + except Exception, e: + root_logger.error("unable to parse cookie header '%s': %s", cookie_header, e) + return + + if session_cookie is None: + return + + cookie_string = str(session_cookie) + root_logger.debug("storing cookie '%s' for principal %s", cookie_string, principal) + try: + update_persistent_client_session_data(principal, cookie_string) + except Exception, e: + # Not fatal, we just can't use the session cookie we were sent. + pass + def parse_response(self, response): - session_cookie = response.getheader('Set-Cookie') - if session_cookie: - principal = getattr(context, 'principal', None) - try: - kernel_keyring.update_key(COOKIE_NAME % principal, session_cookie) - except ValueError, e: - # Not fatal, we just can't use the session cookie we were - # sent. - pass + self.store_session_cookie(response.getheader('Set-Cookie')) return SSLTransport.parse_response(self, response) @@ -377,22 +477,6 @@ class xmlclient(Connectible): super(xmlclient, self).__init__() self.__errors = dict((e.errno, e) for e in public_errors) - def reconstruct_url(self): - """ - The URL directly isn't stored in the ServerProxy. We can't store - it in the connection object itself but we can reconstruct it - from the ServerProxy. - """ - if not hasattr(self.conn, '_ServerProxy__transport'): - return None - if (isinstance(self.conn._ServerProxy__transport, KerbTransport) or - isinstance(self.conn._ServerProxy__transport, DelegatedKerbTransport)): - scheme = "https" - else: - scheme = "http" - server = '%s://%s%s' % (scheme, ipautil.format_netloc(self.conn._ServerProxy__host), self.conn._ServerProxy__handler) - return server - def get_url_list(self, xmlrpc_uri): """ Create a list of urls consisting of the available IPA servers. @@ -425,40 +509,122 @@ class xmlclient(Connectible): return servers + def get_session_cookie_from_persistent_storage(self, principal): + ''' + Retrieves the session cookie for the given principal from the + persistent secure storage. Returns None if not found or unable + to retrieve the session cookie for any reason, otherwise + returns a Cookie object containing the session cookie. + ''' + + # Get the session data, it should contain a cookie string + # (possibly with more than one cookie). + try: + cookie_string = read_persistent_client_session_data(principal) + except Exception, e: + return None + + # Search for the session cookie within the cookie string + try: + session_cookie = Cookie.get_named_cookie_from_string(cookie_string, COOKIE_NAME) + except Exception, e: + return None + + return session_cookie + + def apply_session_cookie(self, url): + ''' + Attempt to load a session cookie for the current principal + from the persistent secure storage. If the cookie is + successfully loaded adjust the input url's to point to the + session path and insert the session cookie into the per thread + context for later insertion into the HTTP request. If the + cookie is not successfully loaded then the original url is + returned and the per thread context is not modified. + + Context Dependencies: + + The per thread context is expected to contain: + principal + The current pricipal the HTTP request was issued for. + + The per thread context will be updated with: + session_cookie + A cookie string to be inserted into the Cookie header + of the HTPP request. + + ''' + + original_url = url + principal = getattr(context, 'principal', None) + + session_cookie = self.get_session_cookie_from_persistent_storage(principal) + if session_cookie is None: + self.log.debug("failed to find session_cookie in persistent storage for principal '%s'", + principal) + return original_url + else: + self.debug("found session_cookie in persistent storage for principal '%s', cookie: '%s'", + principal, session_cookie) + + # Decide if we should send the cookie to the server + try: + session_cookie.http_return_ok(original_url) + except Cookie.Expired, e: + self.debug("deleting session data for principal '%s': %s", principal, e) + try: + delete_persistent_client_session_data(principal) + except Exception, e: + pass + return original_url + except Cookie.URLMismatch, e: + self.debug("not sending session cookie, URL mismatch: %s", e) + return original_url + except Exception, e: + self.error("not sending session cookie, unknown error: %s", e) + return original_url + + # O.K. session_cookie is valid to be returned, stash it away where it will will + # get included in a HTTP Cookie headed sent to the server. + self.log.debug("setting session_cookie into context '%s'", session_cookie.http_cookie()) + setattr(context, 'session_cookie', session_cookie.http_cookie()) + + # Form the session URL by substituting the session path into the original URL + scheme, netloc, path, params, query, fragment = urlparse.urlparse(original_url) + path = '/ipa/session/xml' + session_url = urlparse.urlunparse((scheme, netloc, path, params, query, fragment)) + + return session_url + def create_connection(self, ccache=None, verbose=False, fallback=True, delegate=False): try: - session = False - session_data = None xmlrpc_uri = self.env.xmlrpc_uri principal = get_current_principal() setattr(context, 'principal', principal) # We have a session cookie, try using the session URI to see if it # is still valid if not delegate: - session_data = kernel_keyring.read_key(COOKIE_NAME % principal) - setattr(context, 'session_data', session_data) - (scheme, netloc, path, params, query, fragment) = urlparse.urlparse(self.env.xmlrpc_uri) - xmlrpc_uri = urlparse.urlunparse((scheme, netloc, '/ipa/session/xml', params, query, fragment)) - session = True + xmlrpc_uri = self.apply_session_cookie(xmlrpc_uri) except ValueError: # No session key, do full Kerberos auth pass - servers = self.get_url_list(xmlrpc_uri) + urls = self.get_url_list(xmlrpc_uri) serverproxy = None - for server in servers: + for url in urls: kw = dict(allow_none=True, encoding='UTF-8') kw['verbose'] = verbose - if server.startswith('https://'): + if url.startswith('https://'): if delegate: kw['transport'] = DelegatedKerbTransport() else: kw['transport'] = KerbTransport() else: kw['transport'] = LanguageAwareTransport() - self.log.info('trying %s' % server) - serverproxy = ServerProxy(server, **kw) - if len(servers) == 1: + self.log.info('trying %s' % url) + setattr(context, 'request_url', url) + serverproxy = ServerProxy(url, **kw) + if len(urls) == 1: # if we have only 1 server and then let the # main requester handle any errors. This also means it # must handle a 401 but we save a ping. @@ -476,7 +642,7 @@ class xmlclient(Connectible): raise UnknownError( code=e.faultCode, error=e.faultString, - server=server, + server=url, ) # We don't care about the response, just that we got one break @@ -484,14 +650,13 @@ class xmlclient(Connectible): # kerberos error on one server is likely on all raise errors.KerberosError(major=str(krberr), minor='') except ProtocolError, e: - if session_data and e.errcode == 401: + if hasattr(context, 'session_cookie') and e.errcode == 401: # Unauthorized. Remove the session and try again. - delattr(context, 'session_data') + delattr(context, 'session_cookie') try: - kernel_keyring.del_key(COOKIE_NAME % principal) - except ValueError: - # This shouldn't happen if we have a session but - # it isn't fatal. + delete_persistent_client_session_data(principal) + except Exception, e: + # This shouldn't happen if we have a session but it isn't fatal. pass return self.create_connection(ccache, verbose, fallback, delegate) if not fallback: @@ -504,7 +669,7 @@ class xmlclient(Connectible): if serverproxy is None: raise NetworkError(uri=_('any of the configured servers'), - error=', '.join(servers)) + error=', '.join(urls)) return serverproxy def destroy_connection(self): @@ -529,7 +694,7 @@ class xmlclient(Connectible): raise ValueError( '%s.forward(): %r not in api.Command' % (self.name, name) ) - server = self.reconstruct_url() + server = getattr(context, 'request_url', None) self.info('Forwarding %r to server %r', name, server) command = getattr(self.conn, name) params = [args, kw] @@ -554,16 +719,15 @@ class xmlclient(Connectible): # By catching a 401 here we can detect the case where we have # a single IPA server and the session is invalid. Otherwise # we always have to do a ping(). - session_data = getattr(context, 'session_data', None) - if session_data and e.errcode == 401: + session_cookie = getattr(context, 'session_cookie', None) + if session_cookie and e.errcode == 401: # Unauthorized. Remove the session and try again. - delattr(context, 'session_data') + delattr(context, 'session_cookie') try: principal = getattr(context, 'principal', None) - kernel_keyring.del_key(COOKIE_NAME % principal) - except ValueError: - # This shouldn't happen if we have a session but - # it isn't fatal. + delete_persistent_client_session_data(principal) + except Exception, e: + # This shouldn't happen if we have a session but it isn't fatal. pass # Create a new serverproxy with the non-session URI. If there diff --git a/ipalib/session.py b/ipalib/session.py index 36beececd..68b9b264b 100644 --- a/ipalib/session.py +++ b/ipalib/session.py @@ -17,17 +17,18 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. import memcache -import Cookie import random import errors import os import re import time +from urllib2 import urlparse from text import _ from ipapython.ipa_log_manager import * from ipalib import api, errors from ipalib import Command from ipalib.krb_utils import * +from ipapython.cookie import Cookie __doc__ = ''' Session Support for IPA @@ -954,14 +955,24 @@ class MemcacheSessionManager(SessionManager): :returns: Session id as string or None if not found. ''' + + if cookie_header is None: + return None + session_id = None - if cookie_header is not None: - cookie = Cookie.SimpleCookie() - cookie.load(cookie_header) - session_cookie = cookie.get(self.session_cookie_name) - if session_cookie is not None: - session_id = session_cookie.value - self.debug('found session cookie_id = %s', session_id) + + try: + session_cookie = Cookie.get_named_cookie_from_string(cookie_header, self.session_cookie_name) + except Exception, e: + session_cookie = None + if session_cookie: + session_id = session_cookie.value + + if session_id is None: + self.debug('no session cookie found') + else: + self.debug('found session cookie_id = %s', session_id) + return session_id @@ -1050,7 +1061,7 @@ class MemcacheSessionManager(SessionManager): self.mc.set(session_key, session_data, time=session_expiration_timestamp) return session_id - def generate_cookie(self, url_path, session_id, add_header=False): + def generate_cookie(self, url_path, session_id, expiration=None, add_header=False): ''' Return a session cookie containing the session id. The cookie will be contrainted to the url path, defined for use @@ -1068,15 +1079,18 @@ class MemcacheSessionManager(SessionManager): :returns: cookie string ''' - cookie = Cookie.SimpleCookie() - cookie[self.session_cookie_name] = session_id - cookie[self.session_cookie_name]['path'] = url_path - cookie[self.session_cookie_name]['httponly'] = True - cookie[self.session_cookie_name]['secure'] = True + + if not expiration: # Catch zero unix timestamps + expiration = None; + + cookie = Cookie(self.session_cookie_name, session_id, + domain=urlparse.urlparse(api.env.xmlrpc_uri).netloc, + path=url_path, httponly=True, secure=True, + expires=expiration) if add_header: - result = cookie.output().strip() + result = 'Set-Cookie: %s' % cookie else: - result = cookie.output(header='').strip() + result = str(cookie) return result diff --git a/ipapython/cookie.py b/ipapython/cookie.py new file mode 100644 index 000000000..b45cb2b11 --- /dev/null +++ b/ipapython/cookie.py @@ -0,0 +1,699 @@ +# Authors: +# John Dennis <jdennis@redhat.com> +# +# Copyright (C) 2012 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import re +import time +import datetime +from urllib2 import urlparse +from calendar import timegm +from ipapython.ipa_log_manager import log_mgr + +''' +Core Python has two cookie libraries, Cookie.py targeted to server +side and cookielib.py targeted to client side. So why this module and +not use the standard libraries? + +Cookie.py has some serious bugs, it cannot correctly parse the +HttpOnly, Secure, and Expires cookie attributes (more of a client side +need and not what it was designed for). Since we utilize those +attributes that makes Cookie.py a non-starter. Plus it's API awkard +and limited (we would have to build more on top of it). + +The Cookie.py bug reports are: + +http://bugs.python.org/issue3073 +http://bugs.python.org/issue16611 + +cookielib.py has a lot of good featuress, a nice API and covers all +the relevant RFC's as well as actual practice in the field. However +cookielib.py is tighly integrated with urllib2 and it's not possible +to use most of the features of cookielib without simultaneously using +urllib2. Unfortunataely we only use httplib because of our dependency +on xmlrpclib. Without urllib2 cookielib is a non-starter. + +This module is a minimal implementation of Netscape cookies which +works equally well on either the client or server side. It's API is +easy to use with cookie attributes as class properties which can be +read or set easily. The Cookie object automatically converts Expires +and Max-Age attributes into datetime objects for easy time +comparision. Cookies in strings can easily be parsed, including +multiple cookies in the HTTP_COOKIE envionment variable. + +The cookie RFC is silent on any escaping requirements for cookie +contents as such this module does not provide any automated support +escaping and unescapin. + +''' + +#------------------------------------------------------------------------------- + +# FIXME: The use of properties for the attributes timestamp, expires +# and max_age produce a pylint error which is a false positive, this +# is a known bug in pylint (http://www.logilab.org/ticket/89092, +# http://www.logilab.org/ticket/89786) after the pylint bug is fixed +# the disables for E0202 should be removed. + +class Cookie(object): + ''' + A Cookie object has the following attributes: + + key + The name of the cookie + value + The value of the cookie + + A Cookie also supports these predefined optional attributes. If an + optional attribute is not set on the cookie it's value is None. + + domain + Restrict cookie usage to this domain + path + Restrict cookie usage to this path or below + expires + Cookie is invalid after this UTC timestamp + max_age + Cookie is invalid this many seconds in the future. + Has precedence over the expires attribute. + secure + Cookie should only be returned on secure (i.e. SSL/TLS) + connections. + httponly + Cookie is intended only for HTTP communication, it can + never be utilized in any other context (e.g. browser + Javascript). + + See the documentation of get_expiration() for an explanation of + how the expires and max-age attributes interact as well as the + role of the timestamp attribute. Expiration values are stored as + datetime objects for easy manipulation and comparision. + + There are two ways to instantiate a Cookie object. Either directly + via the constructor or by calling the class function parse() which + returns a list of Cookie objects found in a string. + + To create a cookie to sent to a client: + + Example: + + cookie = Cookie('session', session_id, + domain=my_domain, path=mypath, + httpOnly=True, secure=True, expires=expiration) + headers.append(('Set-Cookie', str(cookie))) + + + To receive cookies from a request: + + Example: + + cookies = Cookie.parse(response.getheader('Set-Cookie'), request_url) + + ''' + + class Expired(ValueError): + pass + + class URLMismatch(ValueError): + pass + + # regexp to split fields at a semi-colon + field_re = re.compile(r';\s*') + + # regexp to locate a key/value pair + kv_pair_re = re.compile(r'^\s*([a-zA-Z0-9\!\#\$\%\&\'\*\+\-\.\^\_\`\|\~]+)\s*=\s*(.*?)\s*$', re.IGNORECASE) + + # Reserved attribute names, maps from lower case protocol name to + # object attribute name + attrs = {'domain' : 'domain', + 'path' : 'path', + 'max-age' : 'max_age', + 'expires' : 'expires', + 'secure' : 'secure', + 'httponly' : 'httponly'} + + @classmethod + def datetime_to_time(cls, dt): + ''' + Timestamps (timestamp & expires) are stored as datetime + objects in UTC. It's non-obvious how to convert a naive UTC + datetime into a unix time value (seconds since the epoch + UTC). That functionality is oddly missing from the datetime + and time modules. This utility provides that missing + functionality. + ''' + # Use timegm from the calendar module + return timegm(dt.utctimetuple()) + + @classmethod + def datetime_to_string(cls, dt=None): + ''' + Given a datetime object in UTC generate RFC 1123 date string. + ''' + + # Try to verify dt is specified as UTC. If utcoffset is not + # available we'll just have to assume the caller is using the + # correct timezone. + utcoffset = dt.utcoffset() + if utcoffset is not None and utcoffset.total_seconds() != 0.0: + raise ValueError("timezone is not UTC") + + # At this point we've validated as much as possible the + # timezone is UTC or GMT but we can't use the %Z timezone + # format specifier because the timezone in the string must be + # 'GMT', not something equivalent to GMT, so hardcode the GMT + # timezone string into the format. + + return datetime.datetime.strftime(dt, '%a, %d %b %Y %H:%M:%S GMT') + + @classmethod + def parse_datetime(cls, s): + ''' + Parse a RFC 822, RFC 1123 date string, return a datetime aware object in UTC. + Accommodates some non-standard formats found in the wild. + ''' + + formats = ['%a, %d %b %Y %H:%M:%S', + '%a, %d-%b-%Y %H:%M:%S', + '%a, %d-%b-%y %H:%M:%S', + '%a, %d %b %y %H:%M:%S', + ] + s = s.strip() + + # strptime does not read the time zone and generate a tzinfo + # object to insert in the datetime object so there is little point + # in specifying a %Z format, instead verify GMT is specified and + # generate the datetime object as if it were UTC. + + if not s.endswith(' GMT'): + raise ValueError("http date string '%s' does not end with GMT time zone" % s) + s = s[:-4] + + dt = None + for format in formats: + try: + dt = datetime.datetime(*(time.strptime(s, format)[0:6])) + break + except Exception: + continue + + if dt is None: + raise ValueError("unable to parse expires datetime '%s'" % s) + + return dt + + @classmethod + def normalize_url_path(cls, url_path): + ''' + Given a URL path, possibly empty, return a path consisting + only of directory components. The URL path must end with a + trailing slash for the last path element to be considered a + directory. Also the URL path must begin with a slash. Empty + input returns '/'. + + Examples: + + '' -> '/' + '/' -> '/' + 'foo' -> '/' + 'foo/' -> '/' + '/foo -> '/' + '/foo/' -> '/foo' + '/foo/bar' -> '/foo' + '/foo/bar/' -> '/foo/bar' + ''' + url_path = url_path.lower() + + if not url_path: + return '/' + + if not url_path.startswith('/'): + return '/' + + if url_path.count('/') <= 1: + return'/' + + return url_path[:url_path.rindex('/')] + + + @classmethod + def parse(cls, cookie_string, request_url=None): + ''' + Given a string containing one or more cookies (the + HTTP_COOKIES environment variable typically contains multiple + cookies) parse the string and return a list of Cookie objects + found in the string. + ''' + + # Our list of returned cookies + cookies = [] + + # Split the input string at semi-colon boundaries, we call this a + # field. A field may either be a single keyword or a key=value + # pair. + fields = Cookie.field_re.split(cookie_string) + + # The input string may have multiple cookies inside it. This is + # common when the string comes from a HTTP_COOKIE environment + # variable. All the cookies will be contenated, separated by a + # semi-colon. Semi-colons are also the separator between + # attributes in a cookie. + # + # To distinguish between two adjacent cookies in a string we + # have to locate the key=value pair at the start of a + # cookie. Unfortunately cookies have attributes that also look + # like key/value pairs, the only way to distinguish a cookie + # attribute from a cookie is the fact the attribute names are + # reserved. A cookie attribute may either be a key/value pair + # or a single key (e.g. HttpOnly). As we scan the cookie we + # first identify the key=value (cookie name, cookie + # value). Then we continue scanning, if a bare key or + # key/value pair follows and is a known reserved keyword than + # that's an attribute belonging to the current cookie. As soon + # as we see a key/value pair whose key is not reserved we know + # we've found a new cookie. Bare keys (no value) can never + # start a new cookie. + + # Iterate over all the fields and emit a new cookie whenever the + # next field is not a known attribute. + cookie = None + for field in fields: + match = Cookie.kv_pair_re.search(field) + if match: + key = match.group(1) + value = match.group(2) + # Double quoted value? + if value[0] == '"': + if value[-1] == '"': + value = value[1:-1] + else: + raise ValueError("unterminated quote in '%s'" % value) + kv_pair = True + else: + key = field + value = True # True because bare keys are boolean flags + kv_pair = False + + is_attribute = key.lower() in Cookie.attrs + + # First cookie found, create new cookie object + if cookie is None and kv_pair and not is_attribute: + cookie = Cookie(key, value) + + # If start of new cookie then flush previous cookie and create + # a new one (it's a new cookie because it's a key/value pair + # whose key is not a reserved keyword). + elif cookie and kv_pair and not is_attribute: + if request_url is not None: + cookie.normalize(request_url) + cookies.append(cookie) + cookie = Cookie(key, value) + + # If it's a reserved keyword add that as an attribute to the + # current cookie being scanned. + elif cookie and is_attribute: + cookie.__set_attr(key, value) + # If we've found a non-empty single token that's not a + # reserved keyword it's an error. An empty token can occur + # when there are two adjacent semi-colons (i.e. "; ;"). + # We don't consider empty tokens an error. + elif key: + raise ValueError("unknown cookie token '%s'" % key) + + # Flush out final cookie + if cookie: + if request_url is not None: + cookie.normalize(request_url) + cookies.append(cookie) + + return cookies + + @classmethod + def get_named_cookie_from_string(cls, cookie_string, cookie_name, request_url=None): + ''' + A cookie string may contain multiple cookies, parse the cookie + string and return the last cookie in the string matching the + cookie name or None if not found. + + This is basically a utility wrapper around the parse() class + method which iterates over what parse() returns looking for + the specific cookie. + + When cookie_name appears more than once the last instance is + returned rather than the first because the ordering sequence + makes the last instance the current value. + ''' + + target_cookie = None + + cookies = cls.parse(cookie_string) + for cookie in cookies: + if cookie.key == cookie_name: + target_cookie = cookie + + if request_url is not None: + target_cookie.normalize(request_url) + return target_cookie + + + def __init__(self, key, value, domain=None, path=None, max_age=None, expires=None, + secure=None, httponly=None, timestamp=None): + + log_mgr.get_logger(self, True) + + self.key = key + self.value = value + self.domain = domain + self.path = path + self.max_age = max_age + self.expires = expires + self.secure = secure + self.httponly = httponly + self.timestamp = timestamp + + @property + def timestamp(self): #pylint: disable=E0202 + ''' + The UTC moment at which cookie was received for purposes of + computing the expiration given a Max-Age offset. The + expiration will be timestamp + max_age. The timestamp value + will aways be a datetime object. + + By default the timestamp will be the moment the Cookie object + is created as this often corresponds to the moment the cookie + is received (the intent of the Max-Age attribute). But becuase + it's sometimes desirable to force a specific moment for + purposes of computing the expiration from the Max-Age the + Cookie timestamp can be updated. + + Setting a value of None causes the timestamp to be set to the + current UTC time (now). You may also assign with a numeric + UNIX timestamp (seconds since the epoch UTC) or a formatted time + sting, in all cases the value will be converted to a datetime + object. + ''' + return self._timestamp + + @timestamp.setter + def timestamp(self, value): #pylint: disable=E0202 + if value is None: + self._timestamp = None + elif isinstance(value, datetime.datetime): + self._timestamp = value + elif isinstance(value, (int, long, float)): + self._timestamp = datetime.datetime.utcfromtimestamp(value) + elif isinstance(value, basestring): + self._timestamp = Cookie.parse_datetime(value) + else: + raise TypeError('value must be datetime, int, long, float, basestring or None, not %s' % \ + value.__class__.__name__) + + @property + def expires(self): #pylint: disable=E0202 + ''' + The expiration timestamp (in UTC) as a datetime object for the + cookie, or None if not set. + + You may assign a value of None, a datetime object, a numeric + UNIX timestamp (seconds since the epoch UTC) or formatted time + string (the latter two will be converted to a datetime object. + ''' + return self._expires + + @expires.setter + def expires(self, value): #pylint: disable=E0202 + if value is None: + self._expires = None + elif isinstance(value, datetime.datetime): + self._expires = value + elif isinstance(value, (int, long, float)): + self._expires = datetime.datetime.utcfromtimestamp(value) + elif isinstance(value, basestring): + self._expires = Cookie.parse_datetime(value) + else: + raise TypeError('value must be datetime, int, long, float, basestring or None, not %s' % \ + value.__class__.__name__) + + @property + def max_age(self): #pylint: disable=E0202 + ''' + The lifetime duration of the cookie. Computed as an offset + from the cookie's timestamp. + ''' + return self._max_age + + @max_age.setter + def max_age(self, value): #pylint: disable=E0202 + if value is None: + self._max_age = None + else: + try: + self._max_age = int(value) + except Exception: + raise ValueError("Max-Age value '%s' not convertable to integer" % value) + + def __set_attr(self, name, value): + ''' + Sets one of the predefined cookie attributes. + ''' + attr_name = Cookie.attrs.get(name.lower(), None) + if attr_name is None: + raise ValueError("unknown cookie attribute '%s'" % name) + setattr(self, attr_name, value) + + def __str__(self): + components = [] + + components.append("%s=%s" % (self.key, self.value)) + + if self.domain is not None: + components.append("Domain=%s" % self.domain) + + if self.path is not None: + components.append("Path=%s" % self.path) + + if self.max_age is not None: + components.append("Max-Age=%s" % self.max_age) + + if self.expires is not None: + components.append("Expires=%s" % Cookie.datetime_to_string(self.expires)) + + if self.secure: + components.append("Secure") + + if self.httponly: + components.append("HttpOnly") + + return '; '.join(components) + + def get_expiration(self): + ''' + Return the effective expiration of the cookie as a datetime + object or None if no expiration is defined. Expiration may be + defined either by the "Expires" timestamp attribute or the + "Max-Age" duration attribute. If both are set "Max-Age" takes + precedence. If neither is set the cookie has no expiration and + None will be returned. + + "Max-Age" specifies the number of seconds in the future from when the + cookie is received until it expires. Effectively it means + adding "Max-Age" seconds to a timestamp to arrive at an + expiration. By default the timestamp used to mark the arrival + of the cookie is set to the moment the cookie object is + created. However sometimes it is desirable to adjust the + received timestamp to something other than the moment of + object creation, therefore you can explicitly set the arrival + timestamp used in the "Max-Age" calculation. + + "Expires" specifies an explicit timestamp. + + If "Max-Age" is set a datetime object is returned which is the + sum of the arrival timestamp and "Max-Age". + + If "Expires" is set a datetime object is returned matching the + timestamp specified as the "Expires" value. + + If neither is set None is returned. + ''' + + if self.max_age is not None: + return self.timestamp + datetime.timedelta(seconds=self.max_age) + + if self.expires is not None: + return self.expires + + return None + + def normalize_expiration(self): + ''' + An expiration may be specified either with an explicit + timestamp in the "Expires" attribute or via an offset + specified witht the "Max-Age" attribute. The "Max-Age" + attribute has precedence over "Expires" if both are + specified. + + This method normalizes the expiration of the cookie such that + only a "Expires" attribute remains after consideration of the + "Max-Age" attribute. This is useful when storing the cookie + for future reference. + ''' + + self.expires = self.get_expiration() + self.max_age = None + return self.expires + + def set_defaults_from_url(self, url): + ''' + If cookie domain and path attributes are not specified then + they assume defaults from the request url the cookie was + received from. + ''' + + scheme, domain, path, params, query, fragment = urlparse.urlparse(url) + + if self.domain is None: + self.domain = domain.lower() + + if self.path is None: + self.path = self.normalize_url_path(path) + + + def normalize(self, url): + ''' + Missing cookie attributes will receive default values derived + from the request URL. The expiration value is normalized. + ''' + + self.set_defaults_from_url(url) + self.normalize_expiration() + + def http_cookie(self): + ''' + Return a string with just the key and value (no attributes). + This is appropriate for including in a HTTP Cookie header. + ''' + return '%s=%s;' % (self.key, self.value) + + def http_return_ok(self, url): + ''' + Tests to see if a cookie should be returned when a request is + sent to a specific URL. + + * The request url's host must match the cookie's doman + otherwise raises Cookie.URLMismatch. + + * The path in the request url must contain the cookie's path + otherwise raises Cookie.URLMismatch. + + * If the cookie defines an expiration date then the current + time must be less or equal to the cookie's expiration + timestamp. Will raise Cookie.Expired if a defined expiration + is not valid. + + If the test fails Cookie.Expired or Cookie.URLMismatch will be raised, + otherwise True is returned. + + ''' + + def domain_valid(url_domain, cookie_domain): + ''' + Compute domain component and perform test per + RFC 6265, Section 5.1.3. "Domain Matching" + ''' + # FIXME: At the moment we can't import from ipalib at the + # module level because of a dependency loop (cycle) in the + # import. Our module layout needs to be refactored. + from ipalib.util import validate_domain_name + try: + validate_domain_name(url_domain) + except Exception, e: + return False + + if cookie_domain is None: + return True + + url_domain = url_domain.lower() + cookie_domain = cookie_domain.lower() + + if url_domain == cookie_domain: + return True + + if url_domain.endswith(cookie_domain): + if cookie_domain.startswith('.'): + return True + + return False + + def path_valid(url_path, cookie_path): + ''' + Compute path component and perform test per + RFC 6265, Section 5.1.4. "Paths and Path-Match" + ''' + + if cookie_path is None: + return True + + cookie_path = cookie_path.lower() + request_path = self.normalize_url_path(url_path) + + if cookie_path == request_path: + return True + + if cookie_path and request_path.startswith(cookie_path): + if cookie_path.endswith('/'): + return True + + tail = request_path[len(cookie_path):] + if tail.startswith('/'): + return True + + return False + + cookie_name = self.key + + url_scheme, url_domain, url_path, url_params, url_query, url_fragment = urlparse.urlparse(url) + + cookie_expiration = self.get_expiration() + if cookie_expiration is not None: + now = datetime.datetime.utcnow() + if cookie_expiration < now: + raise Cookie.Expired("cookie named '%s'; expired at %s'" % \ + (cookie_name, + self.datetime_to_string(cookie_expiration))) + + if not domain_valid(url_domain, self.domain): + raise Cookie.URLMismatch("cookie named '%s'; it's domain '%s' does not match URL domain '%s'" % \ + (cookie_name, self.domain, url_domain)) + + if not path_valid(url_path, self.path): + raise Cookie.URLMismatch("cookie named '%s'; it's path '%s' does not contain the URL path '%s'" % \ + (cookie_name, self.path, url_path)) + + url_scheme = url_scheme.lower() + + if self.httponly: + if url_scheme not in ('http', 'https'): + raise Cookie.URLMismatch("cookie named '%s'; is restricted to HTTP but it's URL scheme is '%s'" % \ + (cookie_name, url_scheme)) + + if self.secure: + if url_scheme not in ('https',): + raise Cookie.URLMismatch("cookie named '%s'; is restricted to secure transport but it's URL scheme is '%s'" % \ + (cookie_name, url_scheme)) + + + return True diff --git a/ipaserver/rpcserver.py b/ipaserver/rpcserver.py index d2f2acd92..8bce48bea 100644 --- a/ipaserver/rpcserver.py +++ b/ipaserver/rpcserver.py @@ -384,7 +384,8 @@ class WSGIExecutioner(Executioner): if session_data is not None: # Send session cookie back and store session data # FIXME: the URL path should be retreived from somewhere (but where?), not hardcoded - session_cookie = session_mgr.generate_cookie('/ipa', session_data['session_id']) + session_cookie = session_mgr.generate_cookie('/ipa', session_data['session_id'], + session_data['session_expiration_timestamp']) headers.append(('Set-Cookie', session_cookie)) start_response(status, headers) @@ -666,7 +667,8 @@ class KerberosSession(object): release_ipa_ccache(ccache_name) # Return success and set session cookie - session_cookie = session_mgr.generate_cookie('/ipa', session_id) + session_cookie = session_mgr.generate_cookie('/ipa', session_id, + session_data['session_expiration_timestamp']) headers.append(('Set-Cookie', session_cookie)) start_response(HTTP_STATUS_SUCCESS, headers) @@ -64,7 +64,8 @@ class IPATypeChecker(TypeChecker): 'pattern', 'pattern_errmsg'], 'ipalib.parameters.Enum': ['values'], 'ipalib.parameters.File': ['stdin_if_missing'], - 'urlparse.SplitResult': ['netloc'], + 'urlparse.SplitResult': ['scheme', 'netloc', 'path', 'query', 'fragment', 'username', 'password', 'hostname', 'port'], + 'urlparse.ParseResult': ['scheme', 'netloc', 'path', 'params', 'query', 'fragment', 'username', 'password', 'hostname', 'port'], 'ipaserver.install.ldapupdate.LDAPUpdate' : ['log', 'debug', 'info', 'warning', 'error', 'critical', 'exception'], 'ipaserver.plugins.ldap2.SchemaCache' : ['log', 'debug', 'info', 'warning', 'error', 'critical', 'exception'], 'ipaserver.plugins.ldap2.IPASimpleLDAPObject' : ['log', 'debug', 'info', 'warning', 'error', 'critical', 'exception'], @@ -78,6 +79,7 @@ class IPATypeChecker(TypeChecker): 'ipalib.session.SessionCCache' : ['log', 'debug', 'info', 'warning', 'error', 'critical', 'exception'], 'ipalib.session.MemcacheSessionManager' : ['log', 'debug', 'info', 'warning', 'error', 'critical', 'exception'], 'ipapython.admintool.AdminTool' : ['log', 'debug', 'info', 'warning', 'error', 'critical', 'exception'], + 'ipapython.cookie.Cookie' : ['log', 'debug', 'info', 'warning', 'error', 'critical', 'exception'], } def _related_classes(self, klass): diff --git a/tests/test_ipapython/test_cookie.py b/tests/test_ipapython/test_cookie.py new file mode 100644 index 000000000..f8c5daf41 --- /dev/null +++ b/tests/test_ipapython/test_cookie.py @@ -0,0 +1,475 @@ +# Authors: +# John Dennis <jdennis@redhat.com> +# +# Copyright (C) 2012 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import unittest +import time +import datetime +import calendar +from ipapython.cookie import Cookie + +class TestParse(unittest.TestCase): + + def test_parse(self): + # Empty string + s = '' + cookies = Cookie.parse(s) + self.assertEqual(len(cookies), 0) + + # Invalid single token + s = 'color' + with self.assertRaises(ValueError): + cookies = Cookie.parse(s) + + # Invalid single token that's keyword + s = 'HttpOnly' + with self.assertRaises(ValueError): + cookies = Cookie.parse(s) + + # Invalid key/value pair whose key is a keyword + s = 'domain=example.com' + with self.assertRaises(ValueError): + cookies = Cookie.parse(s) + + # 1 cookie with name/value + s = 'color=blue' + cookies = Cookie.parse(s) + self.assertEqual(len(cookies), 1) + cookie = cookies[0] + self.assertEqual(cookie.key, 'color') + self.assertEqual(cookie.value, 'blue') + self.assertEqual(cookie.domain, None) + self.assertEqual(cookie.path, None) + self.assertEqual(cookie.max_age, None) + self.assertEqual(cookie.expires, None) + self.assertEqual(cookie.secure, None) + self.assertEqual(cookie.httponly, None) + self.assertEqual(str(cookie), "color=blue") + self.assertEqual(cookie.http_cookie(), "color=blue;") + + # 1 cookie with whose value is quoted + # Use "get by name" utility to extract specific cookie + s = 'color="blue"' + cookie = Cookie.get_named_cookie_from_string(s, 'color') + self.assertIsNotNone(cookie) + self.assertIsNotNone(cookie, Cookie) + self.assertEqual(cookie.key, 'color') + self.assertEqual(cookie.value, 'blue') + self.assertEqual(cookie.domain, None) + self.assertEqual(cookie.path, None) + self.assertEqual(cookie.max_age, None) + self.assertEqual(cookie.expires, None) + self.assertEqual(cookie.secure, None) + self.assertEqual(cookie.httponly, None) + self.assertEqual(str(cookie), "color=blue") + self.assertEqual(cookie.http_cookie(), "color=blue;") + + # 1 cookie with name/value and domain, path attributes. + # Change up the whitespace a bit. + s = 'color =blue; domain= example.com ; path = /toplevel ' + cookies = Cookie.parse(s) + self.assertEqual(len(cookies), 1) + cookie = cookies[0] + self.assertEqual(cookie.key, 'color') + self.assertEqual(cookie.value, 'blue') + self.assertEqual(cookie.domain, 'example.com') + self.assertEqual(cookie.path, '/toplevel') + self.assertEqual(cookie.max_age, None) + self.assertEqual(cookie.expires, None) + self.assertEqual(cookie.secure, None) + self.assertEqual(cookie.httponly, None) + self.assertEqual(str(cookie), "color=blue; Domain=example.com; Path=/toplevel") + self.assertEqual(cookie.http_cookie(), "color=blue;") + + # 2 cookies, various attributes + s = 'color=blue; Max-Age=3600; temperature=hot; HttpOnly' + cookies = Cookie.parse(s) + self.assertEqual(len(cookies), 2) + cookie = cookies[0] + self.assertEqual(cookie.key, 'color') + self.assertEqual(cookie.value, 'blue') + self.assertEqual(cookie.domain, None) + self.assertEqual(cookie.path, None) + self.assertEqual(cookie.max_age, 3600) + self.assertEqual(cookie.expires, None) + self.assertEqual(cookie.secure, None) + self.assertEqual(cookie.httponly, None) + self.assertEqual(str(cookie), "color=blue; Max-Age=3600") + self.assertEqual(cookie.http_cookie(), "color=blue;") + cookie = cookies[1] + self.assertEqual(cookie.key, 'temperature') + self.assertEqual(cookie.value, 'hot') + self.assertEqual(cookie.domain, None) + self.assertEqual(cookie.path, None) + self.assertEqual(cookie.max_age, None) + self.assertEqual(cookie.expires, None) + self.assertEqual(cookie.secure, None) + self.assertEqual(cookie.httponly, True) + self.assertEqual(str(cookie), "temperature=hot; HttpOnly") + self.assertEqual(cookie.http_cookie(), "temperature=hot;") + +class TestExpires(unittest.TestCase): + + def setUp(self): + # Force microseconds to zero because cookie timestamps only have second resolution + self.now = datetime.datetime.utcnow().replace(microsecond=0) + self.now_timestamp = calendar.timegm(self.now.utctimetuple()) + self.now_string = datetime.datetime.strftime(self.now, '%a, %d %b %Y %H:%M:%S GMT') + + self.max_age = 3600 # 1 hour + self.age_expiration = self.now + datetime.timedelta(seconds=self.max_age) + self.age_string = datetime.datetime.strftime(self.age_expiration, '%a, %d %b %Y %H:%M:%S GMT') + + self.expires = self.now + datetime.timedelta(days=1) # 1 day + self.expires_timestamp = calendar.timegm(self.expires.utctimetuple()) + self.expires_string = datetime.datetime.strftime(self.expires, '%a, %d %b %Y %H:%M:%S GMT') + + def test_expires(self): + # 1 cookie with name/value and no Max-Age and no Expires + s = 'color=blue;' + cookies = Cookie.parse(s) + self.assertEqual(len(cookies), 1) + cookie = cookies[0] + # Force timestamp to known value + cookie.timestamp = self.now + self.assertEqual(cookie.key, 'color') + self.assertEqual(cookie.value, 'blue') + self.assertEqual(cookie.domain, None) + self.assertEqual(cookie.path, None) + self.assertEqual(cookie.max_age, None) + self.assertEqual(cookie.expires, None) + self.assertEqual(cookie.secure, None) + self.assertEqual(cookie.httponly, None) + self.assertEqual(str(cookie), "color=blue") + self.assertEqual(cookie.get_expiration(), None) + # Normalize + self.assertEqual(cookie.normalize_expiration(), None) + self.assertEqual(cookie.max_age, None) + self.assertEqual(cookie.expires, None) + self.assertEqual(str(cookie), "color=blue") + + # 1 cookie with name/value and Max-Age + s = 'color=blue; max-age=%d' % (self.max_age) + cookies = Cookie.parse(s) + self.assertEqual(len(cookies), 1) + cookie = cookies[0] + # Force timestamp to known value + cookie.timestamp = self.now + self.assertEqual(cookie.key, 'color') + self.assertEqual(cookie.value, 'blue') + self.assertEqual(cookie.domain, None) + self.assertEqual(cookie.path, None) + self.assertEqual(cookie.max_age, self.max_age) + self.assertEqual(cookie.expires, None) + self.assertEqual(cookie.secure, None) + self.assertEqual(cookie.httponly, None) + self.assertEqual(str(cookie), "color=blue; Max-Age=%d" % (self.max_age)) + self.assertEqual(cookie.get_expiration(), self.age_expiration) + # Normalize + self.assertEqual(cookie.normalize_expiration(), self.age_expiration) + self.assertEqual(cookie.max_age, None) + self.assertEqual(cookie.expires, self.age_expiration) + self.assertEqual(str(cookie), "color=blue; Expires=%s" % (self.age_string)) + + + # 1 cookie with name/value and Expires + s = 'color=blue; Expires=%s' % (self.expires_string) + cookies = Cookie.parse(s) + self.assertEqual(len(cookies), 1) + cookie = cookies[0] + # Force timestamp to known value + cookie.timestamp = self.now + self.assertEqual(cookie.key, 'color') + self.assertEqual(cookie.value, 'blue') + self.assertEqual(cookie.domain, None) + self.assertEqual(cookie.path, None) + self.assertEqual(cookie.max_age, None) + self.assertEqual(cookie.expires, self.expires) + self.assertEqual(cookie.secure, None) + self.assertEqual(cookie.httponly, None) + self.assertEqual(str(cookie), "color=blue; Expires=%s" % (self.expires_string)) + self.assertEqual(cookie.get_expiration(), self.expires) + # Normalize + self.assertEqual(cookie.normalize_expiration(), self.expires) + self.assertEqual(cookie.max_age, None) + self.assertEqual(cookie.expires, self.expires) + self.assertEqual(str(cookie), "color=blue; Expires=%s" % (self.expires_string)) + + # 1 cookie with name/value witht both Max-Age and Expires, Max-Age takes precedence + s = 'color=blue; Expires=%s; max-age=%d' % (self.expires_string, self.max_age) + cookies = Cookie.parse(s) + self.assertEqual(len(cookies), 1) + cookie = cookies[0] + # Force timestamp to known value + cookie.timestamp = self.now + self.assertEqual(cookie.key, 'color') + self.assertEqual(cookie.value, 'blue') + self.assertEqual(cookie.domain, None) + self.assertEqual(cookie.path, None) + self.assertEqual(cookie.max_age, self.max_age) + self.assertEqual(cookie.expires, self.expires) + self.assertEqual(cookie.secure, None) + self.assertEqual(cookie.httponly, None) + self.assertEqual(str(cookie), "color=blue; Max-Age=%d; Expires=%s" % (self.max_age, self.expires_string)) + self.assertEqual(cookie.get_expiration(), self.age_expiration) + # Normalize + self.assertEqual(cookie.normalize_expiration(), self.age_expiration) + self.assertEqual(cookie.max_age, None) + self.assertEqual(cookie.expires, self.age_expiration) + self.assertEqual(str(cookie), "color=blue; Expires=%s" % (self.age_string)) + + # Verify different types can be assigned to the timestamp and + # expires attribute. + + cookie = Cookie('color', 'blue') + cookie.timestamp = self.now + self.assertEqual(cookie.timestamp, self.now) + cookie.timestamp = self.now_timestamp + self.assertEqual(cookie.timestamp, self.now) + cookie.timestamp = self.now_string + self.assertEqual(cookie.timestamp, self.now) + + self.assertEqual(cookie.expires, None) + + cookie.expires = self.expires + self.assertEqual(cookie.expires, self.expires) + cookie.expires = self.expires_timestamp + self.assertEqual(cookie.expires, self.expires) + cookie.expires = self.expires_string + self.assertEqual(cookie.expires, self.expires) + +class TestInvalidAttributes(unittest.TestCase): + def test_invalid(self): + # Invalid Max-Age + s = 'color=blue; Max-Age=over-the-hill' + with self.assertRaises(ValueError): + cookies = Cookie.parse(s) + + cookie = Cookie('color', 'blue') + with self.assertRaises(ValueError): + cookie.max_age = 'over-the-hill' + + # Invalid Expires + s = 'color=blue; Expires=Sun, 06 Xxx 1994 08:49:37 GMT' + with self.assertRaises(ValueError): + cookies = Cookie.parse(s) + + cookie = Cookie('color', 'blue') + with self.assertRaises(ValueError): + cookie.expires = 'Sun, 06 Xxx 1994 08:49:37 GMT' + + +class TestAttributes(unittest.TestCase): + def test_attributes(self): + cookie = Cookie('color', 'blue') + self.assertEqual(cookie.key, 'color') + self.assertEqual(cookie.value, 'blue') + self.assertEqual(cookie.domain, None) + self.assertEqual(cookie.path, None) + self.assertEqual(cookie.max_age, None) + self.assertEqual(cookie.expires, None) + self.assertEqual(cookie.secure, None) + self.assertEqual(cookie.httponly, None) + + cookie.domain = 'example.com' + self.assertEqual(cookie.domain, 'example.com') + cookie.domain = None + self.assertEqual(cookie.domain, None) + + cookie.path = '/toplevel' + self.assertEqual(cookie.path, '/toplevel') + cookie.path = None + self.assertEqual(cookie.path, None) + + cookie.max_age = 400 + self.assertEqual(cookie.max_age, 400) + cookie.max_age = None + self.assertEqual(cookie.max_age, None) + + cookie.expires = 'Sun, 06 Nov 1994 08:49:37 GMT' + self.assertEqual(cookie.expires, datetime.datetime(1994, 11, 6, 8, 49, 37)) + cookie.expires = None + self.assertEqual(cookie.expires, None) + + cookie.secure = True + self.assertEqual(cookie.secure, True) + self.assertEqual(str(cookie), "color=blue; Secure") + cookie.secure = False + self.assertEqual(cookie.secure, False) + self.assertEqual(str(cookie), "color=blue") + cookie.secure = None + self.assertEqual(cookie.secure, None) + self.assertEqual(str(cookie), "color=blue") + + cookie.httponly = True + self.assertEqual(cookie.httponly, True) + self.assertEqual(str(cookie), "color=blue; HttpOnly") + cookie.httponly = False + self.assertEqual(cookie.httponly, False) + self.assertEqual(str(cookie), "color=blue") + cookie.httponly = None + self.assertEqual(cookie.httponly, None) + self.assertEqual(str(cookie), "color=blue") + + +class TestHTTPReturn(unittest.TestCase): + def setUp(self): + self.url = 'http://www.foo.bar.com/one/two' + + def test_no_attributes(self): + cookie = Cookie('color', 'blue') + self.assertTrue(cookie.http_return_ok(self.url)) + + def test_domain(self): + cookie = Cookie('color', 'blue', domain='www.foo.bar.com') + self.assertTrue(cookie.http_return_ok(self.url)) + + cookie = Cookie('color', 'blue', domain='.foo.bar.com') + self.assertTrue(cookie.http_return_ok(self.url)) + + cookie = Cookie('color', 'blue', domain='.bar.com') + self.assertTrue(cookie.http_return_ok(self.url)) + + cookie = Cookie('color', 'blue', domain='bar.com') + with self.assertRaises(Cookie.URLMismatch): + self.assertTrue(cookie.http_return_ok(self.url)) + + cookie = Cookie('color', 'blue', domain='bogus.com') + with self.assertRaises(Cookie.URLMismatch): + self.assertTrue(cookie.http_return_ok(self.url)) + + cookie = Cookie('color', 'blue', domain='www.foo.bar.com') + with self.assertRaises(Cookie.URLMismatch): + self.assertTrue(cookie.http_return_ok('http://192.168.1.1/one/two')) + + def test_path(self): + cookie = Cookie('color', 'blue') + self.assertTrue(cookie.http_return_ok(self.url)) + + cookie = Cookie('color', 'blue', path='/') + self.assertTrue(cookie.http_return_ok(self.url)) + + cookie = Cookie('color', 'blue', path='/one') + self.assertTrue(cookie.http_return_ok(self.url)) + + cookie = Cookie('color', 'blue', path='/oneX') + with self.assertRaises(Cookie.URLMismatch): + self.assertTrue(cookie.http_return_ok(self.url)) + + def test_expires(self): + now = datetime.datetime.utcnow().replace(microsecond=0) + + # expires 1 day from now + expires = now + datetime.timedelta(days=1) + + cookie = Cookie('color', 'blue', expires=expires) + self.assertTrue(cookie.http_return_ok(self.url)) + + # expired 1 day ago + expires = now + datetime.timedelta(days=-1) + cookie = Cookie('color', 'blue', expires=expires) + with self.assertRaises(Cookie.Expired): + self.assertTrue(cookie.http_return_ok(self.url)) + + + def test_httponly(self): + cookie = Cookie('color', 'blue', httponly=True) + self.assertTrue(cookie.http_return_ok('http://example.com')) + self.assertTrue(cookie.http_return_ok('https://example.com')) + + with self.assertRaises(Cookie.URLMismatch): + self.assertTrue(cookie.http_return_ok('ftp://example.com')) + + def test_secure(self): + cookie = Cookie('color', 'blue', secure=True) + self.assertTrue(cookie.http_return_ok('https://Xexample.com')) + + with self.assertRaises(Cookie.URLMismatch): + self.assertTrue(cookie.http_return_ok('http://Xexample.com')) + +class TestNormalization(unittest.TestCase): + def setUp(self): + # Force microseconds to zero because cookie timestamps only have second resolution + self.now = datetime.datetime.utcnow().replace(microsecond=0) + self.now_timestamp = calendar.timegm(self.now.utctimetuple()) + self.now_string = datetime.datetime.strftime(self.now, '%a, %d %b %Y %H:%M:%S GMT') + + self.max_age = 3600 # 1 hour + self.age_expiration = self.now + datetime.timedelta(seconds=self.max_age) + self.age_string = datetime.datetime.strftime(self.age_expiration, '%a, %d %b %Y %H:%M:%S GMT') + + self.expires = self.now + datetime.timedelta(days=1) # 1 day + self.expires_timestamp = calendar.timegm(self.expires.utctimetuple()) + self.expires_string = datetime.datetime.strftime(self.expires, '%a, %d %b %Y %H:%M:%S GMT') + + def test_path_normalization(self): + self.assertEqual(Cookie.normalize_url_path(''), '/') + self.assertEqual(Cookie.normalize_url_path('foo'), '/') + self.assertEqual(Cookie.normalize_url_path('foo/'), '/') + self.assertEqual(Cookie.normalize_url_path('/foo'), '/') + self.assertEqual(Cookie.normalize_url_path('/foo/'), '/foo') + self.assertEqual(Cookie.normalize_url_path('/Foo/bar'), '/foo') + self.assertEqual(Cookie.normalize_url_path('/foo/baR/'), '/foo/bar') + + def test_normalization(self): + cookie = Cookie('color', 'blue', expires=self.expires) + cookie.timestamp = self.now_timestamp + + self.assertEqual(cookie.domain, None) + self.assertEqual(cookie.path, None) + + url = 'http://example.COM/foo' + cookie.normalize(url) + self.assertEqual(cookie.domain, 'example.com') + self.assertEqual(cookie.path, '/') + self.assertEqual(cookie.expires, self.expires) + + cookie = Cookie('color', 'blue', max_age=self.max_age) + cookie.timestamp = self.now_timestamp + + self.assertEqual(cookie.domain, None) + self.assertEqual(cookie.path, None) + + url = 'http://example.com/foo/' + cookie.normalize(url) + self.assertEqual(cookie.domain, 'example.com') + self.assertEqual(cookie.path, '/foo') + self.assertEqual(cookie.expires, self.age_expiration) + + cookie = Cookie('color', 'blue') + url = 'http://example.com/foo' + cookie.normalize(url) + self.assertEqual(cookie.domain, 'example.com') + self.assertEqual(cookie.path, '/') + + cookie = Cookie('color', 'blue') + url = 'http://example.com/foo/bar' + cookie.normalize(url) + self.assertEqual(cookie.domain, 'example.com') + self.assertEqual(cookie.path, '/foo') + + cookie = Cookie('color', 'blue') + url = 'http://example.com/foo/bar/' + cookie.normalize(url) + self.assertEqual(cookie.domain, 'example.com') + self.assertEqual(cookie.path, '/foo/bar') + + +#------------------------------------------------------------------------------- +if __name__ == '__main__': + unittest.main() |