summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xipa-client/ipa-install/ipa-client-install5
-rw-r--r--ipalib/rpc.py280
-rw-r--r--ipalib/session.py46
-rw-r--r--ipapython/cookie.py699
-rw-r--r--ipaserver/rpcserver.py6
-rwxr-xr-xmake-lint4
-rw-r--r--tests/test_ipapython/test_cookie.py475
7 files changed, 1435 insertions, 80 deletions
diff --git a/ipa-client/ipa-install/ipa-client-install b/ipa-client/ipa-install/ipa-client-install
index 7f50b2a2..7c329029 100755
--- a/ipa-client/ipa-install/ipa-client-install
+++ b/ipa-client/ipa-install/ipa-client-install
@@ -42,8 +42,7 @@ try:
from ipalib import api, errors
from ipapython.dn import DN
from ipapython.ssh import SSHPublicKey
- from ipapython import kernel_keyring
- from ipalib.rpc import COOKIE_NAME
+ from ipalib.rpc import delete_persistent_client_session_data
import SSSDConfig
from ConfigParser import RawConfigParser
from optparse import SUPPRESS_HELP, OptionGroup
@@ -1741,7 +1740,7 @@ def install(options, env, fstore, statestore):
# Clear out any current session keyring information
try:
- kernel_keyring.del_key(COOKIE_NAME % host_principal)
+ delete_persistent_client_session_data(host_principal)
except ValueError:
pass
diff --git a/ipalib/rpc.py b/ipalib/rpc.py
index c555105f..a5c5de55 100644
--- a/ipalib/rpc.py
+++ b/ipalib/rpc.py
@@ -37,6 +37,7 @@ import sys
import os
import errno
import locale
+import datetime
from xmlrpclib import (Binary, Fault, dumps, loads, ServerProxy, Transport,
ProtocolError, MININT, MAXINT)
import kerberos
@@ -48,8 +49,10 @@ from ipalib.errors import public_errors, PublicError, UnknownError, NetworkError
from ipalib import errors
from ipalib.request import context, Connection
from ipalib.util import get_current_principal
+from ipapython.ipa_log_manager import root_logger
from ipapython import ipautil
from ipapython import kernel_keyring
+from ipapython.cookie import Cookie
from ipalib.text import _
import httplib
@@ -61,7 +64,65 @@ from ipalib.krb_utils import KRB5KDC_ERR_S_PRINCIPAL_UNKNOWN, KRB5KRB_AP_ERR_TKT
KRB5_FCC_PERM, KRB5_FCC_NOFILE, KRB5_CC_FORMAT, KRB5_REALM_CANT_RESOLVE
from ipapython.dn import DN
-COOKIE_NAME = 'ipa_session_cookie:%s'
+COOKIE_NAME = 'ipa_session'
+KEYRING_COOKIE_NAME = '%s_cookie:%%s' % COOKIE_NAME
+
+
+def client_session_keyring_keyname(principal):
+ '''
+ Return the key name used for storing the client session data for
+ the given principal.
+ '''
+
+ return KEYRING_COOKIE_NAME % principal
+
+def update_persistent_client_session_data(principal, data):
+ '''
+ Given a principal create or update the session data for that
+ principal in the persistent secure storage.
+
+ Raises ValueError if unable to perform the action for any reason.
+ '''
+
+ try:
+ keyname = client_session_keyring_keyname(principal)
+ except Exception, e:
+ raise ValueError(str(e))
+
+ # kernel_keyring only raises ValueError (why??)
+ kernel_keyring.update_key(keyname, data)
+
+def read_persistent_client_session_data(principal):
+ '''
+ Given a principal return the stored session data for that
+ principal from the persistent secure storage.
+
+ Raises ValueError if unable to perform the action for any reason.
+ '''
+
+ try:
+ keyname = client_session_keyring_keyname(principal)
+ except Exception, e:
+ raise ValueError(str(e))
+
+ # kernel_keyring only raises ValueError (why??)
+ return kernel_keyring.read_key(keyname)
+
+def delete_persistent_client_session_data(principal):
+ '''
+ Given a principal remove the session data for that
+ principal from the persistent secure storage.
+
+ Raises ValueError if unable to perform the action for any reason.
+ '''
+
+ try:
+ keyname = client_session_keyring_keyname(principal)
+ except Exception, e:
+ raise ValueError(str(e))
+
+ # kernel_keyring only raises ValueError (why??)
+ kernel_keyring.del_key(keyname)
def xml_wrap(value):
"""
@@ -310,9 +371,9 @@ class KerbTransport(SSLTransport):
if not isinstance(extra_headers, list):
extra_headers = []
- session_data = getattr(context, 'session_data', None)
- if session_data:
- extra_headers.append(('Cookie', session_data))
+ session_cookie = getattr(context, 'session_cookie', None)
+ if session_cookie:
+ extra_headers.append(('Cookie', session_cookie))
return (host, extra_headers, x509)
# Set the remote host principal
@@ -345,16 +406,55 @@ class KerbTransport(SSLTransport):
finally:
self.close()
+ def store_session_cookie(self, cookie_header):
+ '''
+ Given the contents of a Set-Cookie header scan the header and
+ extract each cookie contained within until the session cookie
+ is located. Examine the session cookie if the domain and path
+ are specified, if not update the cookie with those values from
+ the request URL. Then write the session cookie into the key
+ store for the principal. If the cookie header is None or the
+ session cookie is not present in the header no action is
+ taken.
+
+ Context Dependencies:
+
+ The per thread context is expected to contain:
+ principal
+ The current pricipal the HTTP request was issued for.
+ request_url
+ The URL of the HTTP request.
+
+ '''
+
+ if cookie_header is None:
+ return
+
+ principal = getattr(context, 'principal', None)
+ request_url = getattr(context, 'request_url', None)
+ root_logger.debug("received Set-Cookie '%s'", cookie_header)
+
+ # Search for the session cookie
+ try:
+ session_cookie = Cookie.get_named_cookie_from_string(cookie_header,
+ COOKIE_NAME, request_url)
+ except Exception, e:
+ root_logger.error("unable to parse cookie header '%s': %s", cookie_header, e)
+ return
+
+ if session_cookie is None:
+ return
+
+ cookie_string = str(session_cookie)
+ root_logger.debug("storing cookie '%s' for principal %s", cookie_string, principal)
+ try:
+ update_persistent_client_session_data(principal, cookie_string)
+ except Exception, e:
+ # Not fatal, we just can't use the session cookie we were sent.
+ pass
+
def parse_response(self, response):
- session_cookie = response.getheader('Set-Cookie')
- if session_cookie:
- principal = getattr(context, 'principal', None)
- try:
- kernel_keyring.update_key(COOKIE_NAME % principal, session_cookie)
- except ValueError, e:
- # Not fatal, we just can't use the session cookie we were
- # sent.
- pass
+ self.store_session_cookie(response.getheader('Set-Cookie'))
return SSLTransport.parse_response(self, response)
@@ -377,22 +477,6 @@ class xmlclient(Connectible):
super(xmlclient, self).__init__()
self.__errors = dict((e.errno, e) for e in public_errors)
- def reconstruct_url(self):
- """
- The URL directly isn't stored in the ServerProxy. We can't store
- it in the connection object itself but we can reconstruct it
- from the ServerProxy.
- """
- if not hasattr(self.conn, '_ServerProxy__transport'):
- return None
- if (isinstance(self.conn._ServerProxy__transport, KerbTransport) or
- isinstance(self.conn._ServerProxy__transport, DelegatedKerbTransport)):
- scheme = "https"
- else:
- scheme = "http"
- server = '%s://%s%s' % (scheme, ipautil.format_netloc(self.conn._ServerProxy__host), self.conn._ServerProxy__handler)
- return server
-
def get_url_list(self, xmlrpc_uri):
"""
Create a list of urls consisting of the available IPA servers.
@@ -425,40 +509,122 @@ class xmlclient(Connectible):
return servers
+ def get_session_cookie_from_persistent_storage(self, principal):
+ '''
+ Retrieves the session cookie for the given principal from the
+ persistent secure storage. Returns None if not found or unable
+ to retrieve the session cookie for any reason, otherwise
+ returns a Cookie object containing the session cookie.
+ '''
+
+ # Get the session data, it should contain a cookie string
+ # (possibly with more than one cookie).
+ try:
+ cookie_string = read_persistent_client_session_data(principal)
+ except Exception, e:
+ return None
+
+ # Search for the session cookie within the cookie string
+ try:
+ session_cookie = Cookie.get_named_cookie_from_string(cookie_string, COOKIE_NAME)
+ except Exception, e:
+ return None
+
+ return session_cookie
+
+ def apply_session_cookie(self, url):
+ '''
+ Attempt to load a session cookie for the current principal
+ from the persistent secure storage. If the cookie is
+ successfully loaded adjust the input url's to point to the
+ session path and insert the session cookie into the per thread
+ context for later insertion into the HTTP request. If the
+ cookie is not successfully loaded then the original url is
+ returned and the per thread context is not modified.
+
+ Context Dependencies:
+
+ The per thread context is expected to contain:
+ principal
+ The current pricipal the HTTP request was issued for.
+
+ The per thread context will be updated with:
+ session_cookie
+ A cookie string to be inserted into the Cookie header
+ of the HTPP request.
+
+ '''
+
+ original_url = url
+ principal = getattr(context, 'principal', None)
+
+ session_cookie = self.get_session_cookie_from_persistent_storage(principal)
+ if session_cookie is None:
+ self.log.debug("failed to find session_cookie in persistent storage for principal '%s'",
+ principal)
+ return original_url
+ else:
+ self.debug("found session_cookie in persistent storage for principal '%s', cookie: '%s'",
+ principal, session_cookie)
+
+ # Decide if we should send the cookie to the server
+ try:
+ session_cookie.http_return_ok(original_url)
+ except Cookie.Expired, e:
+ self.debug("deleting session data for principal '%s': %s", principal, e)
+ try:
+ delete_persistent_client_session_data(principal)
+ except Exception, e:
+ pass
+ return original_url
+ except Cookie.URLMismatch, e:
+ self.debug("not sending session cookie, URL mismatch: %s", e)
+ return original_url
+ except Exception, e:
+ self.error("not sending session cookie, unknown error: %s", e)
+ return original_url
+
+ # O.K. session_cookie is valid to be returned, stash it away where it will will
+ # get included in a HTTP Cookie headed sent to the server.
+ self.log.debug("setting session_cookie into context '%s'", session_cookie.http_cookie())
+ setattr(context, 'session_cookie', session_cookie.http_cookie())
+
+ # Form the session URL by substituting the session path into the original URL
+ scheme, netloc, path, params, query, fragment = urlparse.urlparse(original_url)
+ path = '/ipa/session/xml'
+ session_url = urlparse.urlunparse((scheme, netloc, path, params, query, fragment))
+
+ return session_url
+
def create_connection(self, ccache=None, verbose=False, fallback=True,
delegate=False):
try:
- session = False
- session_data = None
xmlrpc_uri = self.env.xmlrpc_uri
principal = get_current_principal()
setattr(context, 'principal', principal)
# We have a session cookie, try using the session URI to see if it
# is still valid
if not delegate:
- session_data = kernel_keyring.read_key(COOKIE_NAME % principal)
- setattr(context, 'session_data', session_data)
- (scheme, netloc, path, params, query, fragment) = urlparse.urlparse(self.env.xmlrpc_uri)
- xmlrpc_uri = urlparse.urlunparse((scheme, netloc, '/ipa/session/xml', params, query, fragment))
- session = True
+ xmlrpc_uri = self.apply_session_cookie(xmlrpc_uri)
except ValueError:
# No session key, do full Kerberos auth
pass
- servers = self.get_url_list(xmlrpc_uri)
+ urls = self.get_url_list(xmlrpc_uri)
serverproxy = None
- for server in servers:
+ for url in urls:
kw = dict(allow_none=True, encoding='UTF-8')
kw['verbose'] = verbose
- if server.startswith('https://'):
+ if url.startswith('https://'):
if delegate:
kw['transport'] = DelegatedKerbTransport()
else:
kw['transport'] = KerbTransport()
else:
kw['transport'] = LanguageAwareTransport()
- self.log.info('trying %s' % server)
- serverproxy = ServerProxy(server, **kw)
- if len(servers) == 1:
+ self.log.info('trying %s' % url)
+ setattr(context, 'request_url', url)
+ serverproxy = ServerProxy(url, **kw)
+ if len(urls) == 1:
# if we have only 1 server and then let the
# main requester handle any errors. This also means it
# must handle a 401 but we save a ping.
@@ -476,7 +642,7 @@ class xmlclient(Connectible):
raise UnknownError(
code=e.faultCode,
error=e.faultString,
- server=server,
+ server=url,
)
# We don't care about the response, just that we got one
break
@@ -484,14 +650,13 @@ class xmlclient(Connectible):
# kerberos error on one server is likely on all
raise errors.KerberosError(major=str(krberr), minor='')
except ProtocolError, e:
- if session_data and e.errcode == 401:
+ if hasattr(context, 'session_cookie') and e.errcode == 401:
# Unauthorized. Remove the session and try again.
- delattr(context, 'session_data')
+ delattr(context, 'session_cookie')
try:
- kernel_keyring.del_key(COOKIE_NAME % principal)
- except ValueError:
- # This shouldn't happen if we have a session but
- # it isn't fatal.
+ delete_persistent_client_session_data(principal)
+ except Exception, e:
+ # This shouldn't happen if we have a session but it isn't fatal.
pass
return self.create_connection(ccache, verbose, fallback, delegate)
if not fallback:
@@ -504,7 +669,7 @@ class xmlclient(Connectible):
if serverproxy is None:
raise NetworkError(uri=_('any of the configured servers'),
- error=', '.join(servers))
+ error=', '.join(urls))
return serverproxy
def destroy_connection(self):
@@ -529,7 +694,7 @@ class xmlclient(Connectible):
raise ValueError(
'%s.forward(): %r not in api.Command' % (self.name, name)
)
- server = self.reconstruct_url()
+ server = getattr(context, 'request_url', None)
self.info('Forwarding %r to server %r', name, server)
command = getattr(self.conn, name)
params = [args, kw]
@@ -554,16 +719,15 @@ class xmlclient(Connectible):
# By catching a 401 here we can detect the case where we have
# a single IPA server and the session is invalid. Otherwise
# we always have to do a ping().
- session_data = getattr(context, 'session_data', None)
- if session_data and e.errcode == 401:
+ session_cookie = getattr(context, 'session_cookie', None)
+ if session_cookie and e.errcode == 401:
# Unauthorized. Remove the session and try again.
- delattr(context, 'session_data')
+ delattr(context, 'session_cookie')
try:
principal = getattr(context, 'principal', None)
- kernel_keyring.del_key(COOKIE_NAME % principal)
- except ValueError:
- # This shouldn't happen if we have a session but
- # it isn't fatal.
+ delete_persistent_client_session_data(principal)
+ except Exception, e:
+ # This shouldn't happen if we have a session but it isn't fatal.
pass
# Create a new serverproxy with the non-session URI. If there
diff --git a/ipalib/session.py b/ipalib/session.py
index 36beecec..68b9b264 100644
--- a/ipalib/session.py
+++ b/ipalib/session.py
@@ -17,17 +17,18 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import memcache
-import Cookie
import random
import errors
import os
import re
import time
+from urllib2 import urlparse
from text import _
from ipapython.ipa_log_manager import *
from ipalib import api, errors
from ipalib import Command
from ipalib.krb_utils import *
+from ipapython.cookie import Cookie
__doc__ = '''
Session Support for IPA
@@ -954,14 +955,24 @@ class MemcacheSessionManager(SessionManager):
:returns:
Session id as string or None if not found.
'''
+
+ if cookie_header is None:
+ return None
+
session_id = None
- if cookie_header is not None:
- cookie = Cookie.SimpleCookie()
- cookie.load(cookie_header)
- session_cookie = cookie.get(self.session_cookie_name)
- if session_cookie is not None:
- session_id = session_cookie.value
- self.debug('found session cookie_id = %s', session_id)
+
+ try:
+ session_cookie = Cookie.get_named_cookie_from_string(cookie_header, self.session_cookie_name)
+ except Exception, e:
+ session_cookie = None
+ if session_cookie:
+ session_id = session_cookie.value
+
+ if session_id is None:
+ self.debug('no session cookie found')
+ else:
+ self.debug('found session cookie_id = %s', session_id)
+
return session_id
@@ -1050,7 +1061,7 @@ class MemcacheSessionManager(SessionManager):
self.mc.set(session_key, session_data, time=session_expiration_timestamp)
return session_id
- def generate_cookie(self, url_path, session_id, add_header=False):
+ def generate_cookie(self, url_path, session_id, expiration=None, add_header=False):
'''
Return a session cookie containing the session id. The cookie
will be contrainted to the url path, defined for use
@@ -1068,15 +1079,18 @@ class MemcacheSessionManager(SessionManager):
:returns:
cookie string
'''
- cookie = Cookie.SimpleCookie()
- cookie[self.session_cookie_name] = session_id
- cookie[self.session_cookie_name]['path'] = url_path
- cookie[self.session_cookie_name]['httponly'] = True
- cookie[self.session_cookie_name]['secure'] = True
+
+ if not expiration: # Catch zero unix timestamps
+ expiration = None;
+
+ cookie = Cookie(self.session_cookie_name, session_id,
+ domain=urlparse.urlparse(api.env.xmlrpc_uri).netloc,
+ path=url_path, httponly=True, secure=True,
+ expires=expiration)
if add_header:
- result = cookie.output().strip()
+ result = 'Set-Cookie: %s' % cookie
else:
- result = cookie.output(header='').strip()
+ result = str(cookie)
return result
diff --git a/ipapython/cookie.py b/ipapython/cookie.py
new file mode 100644
index 00000000..b45cb2b1
--- /dev/null
+++ b/ipapython/cookie.py
@@ -0,0 +1,699 @@
+# Authors:
+# John Dennis <jdennis@redhat.com>
+#
+# Copyright (C) 2012 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import re
+import time
+import datetime
+from urllib2 import urlparse
+from calendar import timegm
+from ipapython.ipa_log_manager import log_mgr
+
+'''
+Core Python has two cookie libraries, Cookie.py targeted to server
+side and cookielib.py targeted to client side. So why this module and
+not use the standard libraries?
+
+Cookie.py has some serious bugs, it cannot correctly parse the
+HttpOnly, Secure, and Expires cookie attributes (more of a client side
+need and not what it was designed for). Since we utilize those
+attributes that makes Cookie.py a non-starter. Plus it's API awkard
+and limited (we would have to build more on top of it).
+
+The Cookie.py bug reports are:
+
+http://bugs.python.org/issue3073
+http://bugs.python.org/issue16611
+
+cookielib.py has a lot of good featuress, a nice API and covers all
+the relevant RFC's as well as actual practice in the field. However
+cookielib.py is tighly integrated with urllib2 and it's not possible
+to use most of the features of cookielib without simultaneously using
+urllib2. Unfortunataely we only use httplib because of our dependency
+on xmlrpclib. Without urllib2 cookielib is a non-starter.
+
+This module is a minimal implementation of Netscape cookies which
+works equally well on either the client or server side. It's API is
+easy to use with cookie attributes as class properties which can be
+read or set easily. The Cookie object automatically converts Expires
+and Max-Age attributes into datetime objects for easy time
+comparision. Cookies in strings can easily be parsed, including
+multiple cookies in the HTTP_COOKIE envionment variable.
+
+The cookie RFC is silent on any escaping requirements for cookie
+contents as such this module does not provide any automated support
+escaping and unescapin.
+
+'''
+
+#-------------------------------------------------------------------------------
+
+# FIXME: The use of properties for the attributes timestamp, expires
+# and max_age produce a pylint error which is a false positive, this
+# is a known bug in pylint (http://www.logilab.org/ticket/89092,
+# http://www.logilab.org/ticket/89786) after the pylint bug is fixed
+# the disables for E0202 should be removed.
+
+class Cookie(object):
+ '''
+ A Cookie object has the following attributes:
+
+ key
+ The name of the cookie
+ value
+ The value of the cookie
+
+ A Cookie also supports these predefined optional attributes. If an
+ optional attribute is not set on the cookie it's value is None.
+
+ domain
+ Restrict cookie usage to this domain
+ path
+ Restrict cookie usage to this path or below
+ expires
+ Cookie is invalid after this UTC timestamp
+ max_age
+ Cookie is invalid this many seconds in the future.
+ Has precedence over the expires attribute.
+ secure
+ Cookie should only be returned on secure (i.e. SSL/TLS)
+ connections.
+ httponly
+ Cookie is intended only for HTTP communication, it can
+ never be utilized in any other context (e.g. browser
+ Javascript).
+
+ See the documentation of get_expiration() for an explanation of
+ how the expires and max-age attributes interact as well as the
+ role of the timestamp attribute. Expiration values are stored as
+ datetime objects for easy manipulation and comparision.
+
+ There are two ways to instantiate a Cookie object. Either directly
+ via the constructor or by calling the class function parse() which
+ returns a list of Cookie objects found in a string.
+
+ To create a cookie to sent to a client:
+
+ Example:
+
+ cookie = Cookie('session', session_id,
+ domain=my_domain, path=mypath,
+ httpOnly=True, secure=True, expires=expiration)
+ headers.append(('Set-Cookie', str(cookie)))
+
+
+ To receive cookies from a request:
+
+ Example:
+
+ cookies = Cookie.parse(response.getheader('Set-Cookie'), request_url)
+
+ '''
+
+ class Expired(ValueError):
+ pass
+
+ class URLMismatch(ValueError):
+ pass
+
+ # regexp to split fields at a semi-colon
+ field_re = re.compile(r';\s*')
+
+ # regexp to locate a key/value pair
+ kv_pair_re = re.compile(r'^\s*([a-zA-Z0-9\!\#\$\%\&\'\*\+\-\.\^\_\`\|\~]+)\s*=\s*(.*?)\s*$', re.IGNORECASE)
+
+ # Reserved attribute names, maps from lower case protocol name to
+ # object attribute name
+ attrs = {'domain' : 'domain',
+ 'path' : 'path',
+ 'max-age' : 'max_age',
+ 'expires' : 'expires',
+ 'secure' : 'secure',
+ 'httponly' : 'httponly'}
+
+ @classmethod
+ def datetime_to_time(cls, dt):
+ '''
+ Timestamps (timestamp & expires) are stored as datetime
+ objects in UTC. It's non-obvious how to convert a naive UTC
+ datetime into a unix time value (seconds since the epoch
+ UTC). That functionality is oddly missing from the datetime
+ and time modules. This utility provides that missing
+ functionality.
+ '''
+ # Use timegm from the calendar module
+ return timegm(dt.utctimetuple())
+
+ @classmethod
+ def datetime_to_string(cls, dt=None):
+ '''
+ Given a datetime object in UTC generate RFC 1123 date string.
+ '''
+
+ # Try to verify dt is specified as UTC. If utcoffset is not
+ # available we'll just have to assume the caller is using the
+ # correct timezone.
+ utcoffset = dt.utcoffset()
+ if utcoffset is not None and utcoffset.total_seconds() != 0.0:
+ raise ValueError("timezone is not UTC")
+
+ # At this point we've validated as much as possible the
+ # timezone is UTC or GMT but we can't use the %Z timezone
+ # format specifier because the timezone in the string must be
+ # 'GMT', not something equivalent to GMT, so hardcode the GMT
+ # timezone string into the format.
+
+ return datetime.datetime.strftime(dt, '%a, %d %b %Y %H:%M:%S GMT')
+
+ @classmethod
+ def parse_datetime(cls, s):
+ '''
+ Parse a RFC 822, RFC 1123 date string, return a datetime aware object in UTC.
+ Accommodates some non-standard formats found in the wild.
+ '''
+
+ formats = ['%a, %d %b %Y %H:%M:%S',
+ '%a, %d-%b-%Y %H:%M:%S',
+ '%a, %d-%b-%y %H:%M:%S',
+ '%a, %d %b %y %H:%M:%S',
+ ]
+ s = s.strip()
+
+ # strptime does not read the time zone and generate a tzinfo
+ # object to insert in the datetime object so there is little point
+ # in specifying a %Z format, instead verify GMT is specified and
+ # generate the datetime object as if it were UTC.
+
+ if not s.endswith(' GMT'):
+ raise ValueError("http date string '%s' does not end with GMT time zone" % s)
+ s = s[:-4]
+
+ dt = None
+ for format in formats:
+ try:
+ dt = datetime.datetime(*(time.strptime(s, format)[0:6]))
+ break
+ except Exception:
+ continue
+
+ if dt is None:
+ raise ValueError("unable to parse expires datetime '%s'" % s)
+
+ return dt
+
+ @classmethod
+ def normalize_url_path(cls, url_path):
+ '''
+ Given a URL path, possibly empty, return a path consisting
+ only of directory components. The URL path must end with a
+ trailing slash for the last path element to be considered a
+ directory. Also the URL path must begin with a slash. Empty
+ input returns '/'.
+
+ Examples:
+
+ '' -> '/'
+ '/' -> '/'
+ 'foo' -> '/'
+ 'foo/' -> '/'
+ '/foo -> '/'
+ '/foo/' -> '/foo'
+ '/foo/bar' -> '/foo'
+ '/foo/bar/' -> '/foo/bar'
+ '''
+ url_path = url_path.lower()
+
+ if not url_path:
+ return '/'
+
+ if not url_path.startswith('/'):
+ return '/'
+
+ if url_path.count('/') <= 1:
+ return'/'
+
+ return url_path[:url_path.rindex('/')]
+
+
+ @classmethod
+ def parse(cls, cookie_string, request_url=None):
+ '''
+ Given a string containing one or more cookies (the
+ HTTP_COOKIES environment variable typically contains multiple
+ cookies) parse the string and return a list of Cookie objects
+ found in the string.
+ '''
+
+ # Our list of returned cookies
+ cookies = []
+
+ # Split the input string at semi-colon boundaries, we call this a
+ # field. A field may either be a single keyword or a key=value
+ # pair.
+ fields = Cookie.field_re.split(cookie_string)
+
+ # The input string may have multiple cookies inside it. This is
+ # common when the string comes from a HTTP_COOKIE environment
+ # variable. All the cookies will be contenated, separated by a
+ # semi-colon. Semi-colons are also the separator between
+ # attributes in a cookie.
+ #
+ # To distinguish between two adjacent cookies in a string we
+ # have to locate the key=value pair at the start of a
+ # cookie. Unfortunately cookies have attributes that also look
+ # like key/value pairs, the only way to distinguish a cookie
+ # attribute from a cookie is the fact the attribute names are
+ # reserved. A cookie attribute may either be a key/value pair
+ # or a single key (e.g. HttpOnly). As we scan the cookie we
+ # first identify the key=value (cookie name, cookie
+ # value). Then we continue scanning, if a bare key or
+ # key/value pair follows and is a known reserved keyword than
+ # that's an attribute belonging to the current cookie. As soon
+ # as we see a key/value pair whose key is not reserved we know
+ # we've found a new cookie. Bare keys (no value) can never
+ # start a new cookie.
+
+ # Iterate over all the fields and emit a new cookie whenever the
+ # next field is not a known attribute.
+ cookie = None
+ for field in fields:
+ match = Cookie.kv_pair_re.search(field)
+ if match:
+ key = match.group(1)
+ value = match.group(2)
+ # Double quoted value?
+ if value[0] == '"':
+ if value[-1] == '"':
+ value = value[1:-1]
+ else:
+ raise ValueError("unterminated quote in '%s'" % value)
+ kv_pair = True
+ else:
+ key = field
+ value = True # True because bare keys are boolean flags
+ kv_pair = False
+
+ is_attribute = key.lower() in Cookie.attrs
+
+ # First cookie found, create new cookie object
+ if cookie is None and kv_pair and not is_attribute:
+ cookie = Cookie(key, value)
+
+ # If start of new cookie then flush previous cookie and create
+ # a new one (it's a new cookie because it's a key/value pair
+ # whose key is not a reserved keyword).
+ elif cookie and kv_pair and not is_attribute:
+ if request_url is not None:
+ cookie.normalize(request_url)
+ cookies.append(cookie)
+ cookie = Cookie(key, value)
+
+ # If it's a reserved keyword add that as an attribute to the
+ # current cookie being scanned.
+ elif cookie and is_attribute:
+ cookie.__set_attr(key, value)
+ # If we've found a non-empty single token that's not a
+ # reserved keyword it's an error. An empty token can occur
+ # when there are two adjacent semi-colons (i.e. "; ;").
+ # We don't consider empty tokens an error.
+ elif key:
+ raise ValueError("unknown cookie token '%s'" % key)
+
+ # Flush out final cookie
+ if cookie:
+ if request_url is not None:
+ cookie.normalize(request_url)
+ cookies.append(cookie)
+
+ return cookies
+
+ @classmethod
+ def get_named_cookie_from_string(cls, cookie_string, cookie_name, request_url=None):
+ '''
+ A cookie string may contain multiple cookies, parse the cookie
+ string and return the last cookie in the string matching the
+ cookie name or None if not found.
+
+ This is basically a utility wrapper around the parse() class
+ method which iterates over what parse() returns looking for
+ the specific cookie.
+
+ When cookie_name appears more than once the last instance is
+ returned rather than the first because the ordering sequence
+ makes the last instance the current value.
+ '''
+
+ target_cookie = None
+
+ cookies = cls.parse(cookie_string)
+ for cookie in cookies:
+ if cookie.key == cookie_name:
+ target_cookie = cookie
+
+ if request_url is not None:
+ target_cookie.normalize(request_url)
+ return target_cookie
+
+
+ def __init__(self, key, value, domain=None, path=None, max_age=None, expires=None,
+ secure=None, httponly=None, timestamp=None):
+
+ log_mgr.get_logger(self, True)
+
+ self.key = key
+ self.value = value
+ self.domain = domain
+ self.path = path
+ self.max_age = max_age
+ self.expires = expires
+ self.secure = secure
+ self.httponly = httponly
+ self.timestamp = timestamp
+
+ @property
+ def timestamp(self): #pylint: disable=E0202
+ '''
+ The UTC moment at which cookie was received for purposes of
+ computing the expiration given a Max-Age offset. The
+ expiration will be timestamp + max_age. The timestamp value
+ will aways be a datetime object.
+
+ By default the timestamp will be the moment the Cookie object
+ is created as this often corresponds to the moment the cookie
+ is received (the intent of the Max-Age attribute). But becuase
+ it's sometimes desirable to force a specific moment for
+ purposes of computing the expiration from the Max-Age the
+ Cookie timestamp can be updated.
+
+ Setting a value of None causes the timestamp to be set to the
+ current UTC time (now). You may also assign with a numeric
+ UNIX timestamp (seconds since the epoch UTC) or a formatted time
+ sting, in all cases the value will be converted to a datetime
+ object.
+ '''
+ return self._timestamp
+
+ @timestamp.setter
+ def timestamp(self, value): #pylint: disable=E0202
+ if value is None:
+ self._timestamp = None
+ elif isinstance(value, datetime.datetime):
+ self._timestamp = value
+ elif isinstance(value, (int, long, float)):
+ self._timestamp = datetime.datetime.utcfromtimestamp(value)
+ elif isinstance(value, basestring):
+ self._timestamp = Cookie.parse_datetime(value)
+ else:
+ raise TypeError('value must be datetime, int, long, float, basestring or None, not %s' % \
+ value.__class__.__name__)
+
+ @property
+ def expires(self): #pylint: disable=E0202
+ '''
+ The expiration timestamp (in UTC) as a datetime object for the
+ cookie, or None if not set.
+
+ You may assign a value of None, a datetime object, a numeric
+ UNIX timestamp (seconds since the epoch UTC) or formatted time
+ string (the latter two will be converted to a datetime object.
+ '''
+ return self._expires
+
+ @expires.setter
+ def expires(self, value): #pylint: disable=E0202
+ if value is None:
+ self._expires = None
+ elif isinstance(value, datetime.datetime):
+ self._expires = value
+ elif isinstance(value, (int, long, float)):
+ self._expires = datetime.datetime.utcfromtimestamp(value)
+ elif isinstance(value, basestring):
+ self._expires = Cookie.parse_datetime(value)
+ else:
+ raise TypeError('value must be datetime, int, long, float, basestring or None, not %s' % \
+ value.__class__.__name__)
+
+ @property
+ def max_age(self): #pylint: disable=E0202
+ '''
+ The lifetime duration of the cookie. Computed as an offset
+ from the cookie's timestamp.
+ '''
+ return self._max_age
+
+ @max_age.setter
+ def max_age(self, value): #pylint: disable=E0202
+ if value is None:
+ self._max_age = None
+ else:
+ try:
+ self._max_age = int(value)
+ except Exception:
+ raise ValueError("Max-Age value '%s' not convertable to integer" % value)
+
+ def __set_attr(self, name, value):
+ '''
+ Sets one of the predefined cookie attributes.
+ '''
+ attr_name = Cookie.attrs.get(name.lower(), None)
+ if attr_name is None:
+ raise ValueError("unknown cookie attribute '%s'" % name)
+ setattr(self, attr_name, value)
+
+ def __str__(self):
+ components = []
+
+ components.append("%s=%s" % (self.key, self.value))
+
+ if self.domain is not None:
+ components.append("Domain=%s" % self.domain)
+
+ if self.path is not None:
+ components.append("Path=%s" % self.path)
+
+ if self.max_age is not None:
+ components.append("Max-Age=%s" % self.max_age)
+
+ if self.expires is not None:
+ components.append("Expires=%s" % Cookie.datetime_to_string(self.expires))
+
+ if self.secure:
+ components.append("Secure")
+
+ if self.httponly:
+ components.append("HttpOnly")
+
+ return '; '.join(components)
+
+ def get_expiration(self):
+ '''
+ Return the effective expiration of the cookie as a datetime
+ object or None if no expiration is defined. Expiration may be
+ defined either by the "Expires" timestamp attribute or the
+ "Max-Age" duration attribute. If both are set "Max-Age" takes
+ precedence. If neither is set the cookie has no expiration and
+ None will be returned.
+
+ "Max-Age" specifies the number of seconds in the future from when the
+ cookie is received until it expires. Effectively it means
+ adding "Max-Age" seconds to a timestamp to arrive at an
+ expiration. By default the timestamp used to mark the arrival
+ of the cookie is set to the moment the cookie object is
+ created. However sometimes it is desirable to adjust the
+ received timestamp to something other than the moment of
+ object creation, therefore you can explicitly set the arrival
+ timestamp used in the "Max-Age" calculation.
+
+ "Expires" specifies an explicit timestamp.
+
+ If "Max-Age" is set a datetime object is returned which is the
+ sum of the arrival timestamp and "Max-Age".
+
+ If "Expires" is set a datetime object is returned matching the
+ timestamp specified as the "Expires" value.
+
+ If neither is set None is returned.
+ '''
+
+ if self.max_age is not None:
+ return self.timestamp + datetime.timedelta(seconds=self.max_age)
+
+ if self.expires is not None:
+ return self.expires
+
+ return None
+
+ def normalize_expiration(self):
+ '''
+ An expiration may be specified either with an explicit
+ timestamp in the "Expires" attribute or via an offset
+ specified witht the "Max-Age" attribute. The "Max-Age"
+ attribute has precedence over "Expires" if both are
+ specified.
+
+ This method normalizes the expiration of the cookie such that
+ only a "Expires" attribute remains after consideration of the
+ "Max-Age" attribute. This is useful when storing the cookie
+ for future reference.
+ '''
+
+ self.expires = self.get_expiration()
+ self.max_age = None
+ return self.expires
+
+ def set_defaults_from_url(self, url):
+ '''
+ If cookie domain and path attributes are not specified then
+ they assume defaults from the request url the cookie was
+ received from.
+ '''
+
+ scheme, domain, path, params, query, fragment = urlparse.urlparse(url)
+
+ if self.domain is None:
+ self.domain = domain.lower()
+
+ if self.path is None:
+ self.path = self.normalize_url_path(path)
+
+
+ def normalize(self, url):
+ '''
+ Missing cookie attributes will receive default values derived
+ from the request URL. The expiration value is normalized.
+ '''
+
+ self.set_defaults_from_url(url)
+ self.normalize_expiration()
+
+ def http_cookie(self):
+ '''
+ Return a string with just the key and value (no attributes).
+ This is appropriate for including in a HTTP Cookie header.
+ '''
+ return '%s=%s;' % (self.key, self.value)
+
+ def http_return_ok(self, url):
+ '''
+ Tests to see if a cookie should be returned when a request is
+ sent to a specific URL.
+
+ * The request url's host must match the cookie's doman
+ otherwise raises Cookie.URLMismatch.
+
+ * The path in the request url must contain the cookie's path
+ otherwise raises Cookie.URLMismatch.
+
+ * If the cookie defines an expiration date then the current
+ time must be less or equal to the cookie's expiration
+ timestamp. Will raise Cookie.Expired if a defined expiration
+ is not valid.
+
+ If the test fails Cookie.Expired or Cookie.URLMismatch will be raised,
+ otherwise True is returned.
+
+ '''
+
+ def domain_valid(url_domain, cookie_domain):
+ '''
+ Compute domain component and perform test per
+ RFC 6265, Section 5.1.3. "Domain Matching"
+ '''
+ # FIXME: At the moment we can't import from ipalib at the
+ # module level because of a dependency loop (cycle) in the
+ # import. Our module layout needs to be refactored.
+ from ipalib.util import validate_domain_name
+ try:
+ validate_domain_name(url_domain)
+ except Exception, e:
+ return False
+
+ if cookie_domain is None:
+ return True
+
+ url_domain = url_domain.lower()
+ cookie_domain = cookie_domain.lower()
+
+ if url_domain == cookie_domain:
+ return True
+
+ if url_domain.endswith(cookie_domain):
+ if cookie_domain.startswith('.'):
+ return True
+
+ return False
+
+ def path_valid(url_path, cookie_path):
+ '''
+ Compute path component and perform test per
+ RFC 6265, Section 5.1.4. "Paths and Path-Match"
+ '''
+
+ if cookie_path is None:
+ return True
+
+ cookie_path = cookie_path.lower()
+ request_path = self.normalize_url_path(url_path)
+
+ if cookie_path == request_path:
+ return True
+
+ if cookie_path and request_path.startswith(cookie_path):
+ if cookie_path.endswith('/'):
+ return True
+
+ tail = request_path[len(cookie_path):]
+ if tail.startswith('/'):
+ return True
+
+ return False
+
+ cookie_name = self.key
+
+ url_scheme, url_domain, url_path, url_params, url_query, url_fragment = urlparse.urlparse(url)
+
+ cookie_expiration = self.get_expiration()
+ if cookie_expiration is not None:
+ now = datetime.datetime.utcnow()
+ if cookie_expiration < now:
+ raise Cookie.Expired("cookie named '%s'; expired at %s'" % \
+ (cookie_name,
+ self.datetime_to_string(cookie_expiration)))
+
+ if not domain_valid(url_domain, self.domain):
+ raise Cookie.URLMismatch("cookie named '%s'; it's domain '%s' does not match URL domain '%s'" % \
+ (cookie_name, self.domain, url_domain))
+
+ if not path_valid(url_path, self.path):
+ raise Cookie.URLMismatch("cookie named '%s'; it's path '%s' does not contain the URL path '%s'" % \
+ (cookie_name, self.path, url_path))
+
+ url_scheme = url_scheme.lower()
+
+ if self.httponly:
+ if url_scheme not in ('http', 'https'):
+ raise Cookie.URLMismatch("cookie named '%s'; is restricted to HTTP but it's URL scheme is '%s'" % \
+ (cookie_name, url_scheme))
+
+ if self.secure:
+ if url_scheme not in ('https',):
+ raise Cookie.URLMismatch("cookie named '%s'; is restricted to secure transport but it's URL scheme is '%s'" % \
+ (cookie_name, url_scheme))
+
+
+ return True
diff --git a/ipaserver/rpcserver.py b/ipaserver/rpcserver.py
index d2f2acd9..8bce48be 100644
--- a/ipaserver/rpcserver.py
+++ b/ipaserver/rpcserver.py
@@ -384,7 +384,8 @@ class WSGIExecutioner(Executioner):
if session_data is not None:
# Send session cookie back and store session data
# FIXME: the URL path should be retreived from somewhere (but where?), not hardcoded
- session_cookie = session_mgr.generate_cookie('/ipa', session_data['session_id'])
+ session_cookie = session_mgr.generate_cookie('/ipa', session_data['session_id'],
+ session_data['session_expiration_timestamp'])
headers.append(('Set-Cookie', session_cookie))
start_response(status, headers)
@@ -666,7 +667,8 @@ class KerberosSession(object):
release_ipa_ccache(ccache_name)
# Return success and set session cookie
- session_cookie = session_mgr.generate_cookie('/ipa', session_id)
+ session_cookie = session_mgr.generate_cookie('/ipa', session_id,
+ session_data['session_expiration_timestamp'])
headers.append(('Set-Cookie', session_cookie))
start_response(HTTP_STATUS_SUCCESS, headers)
diff --git a/make-lint b/make-lint
index 29ce758e..ae09e2a1 100755
--- a/make-lint
+++ b/make-lint
@@ -64,7 +64,8 @@ class IPATypeChecker(TypeChecker):
'pattern', 'pattern_errmsg'],
'ipalib.parameters.Enum': ['values'],
'ipalib.parameters.File': ['stdin_if_missing'],
- 'urlparse.SplitResult': ['netloc'],
+ 'urlparse.SplitResult': ['scheme', 'netloc', 'path', 'query', 'fragment', 'username', 'password', 'hostname', 'port'],
+ 'urlparse.ParseResult': ['scheme', 'netloc', 'path', 'params', 'query', 'fragment', 'username', 'password', 'hostname', 'port'],
'ipaserver.install.ldapupdate.LDAPUpdate' : ['log', 'debug', 'info', 'warning', 'error', 'critical', 'exception'],
'ipaserver.plugins.ldap2.SchemaCache' : ['log', 'debug', 'info', 'warning', 'error', 'critical', 'exception'],
'ipaserver.plugins.ldap2.IPASimpleLDAPObject' : ['log', 'debug', 'info', 'warning', 'error', 'critical', 'exception'],
@@ -78,6 +79,7 @@ class IPATypeChecker(TypeChecker):
'ipalib.session.SessionCCache' : ['log', 'debug', 'info', 'warning', 'error', 'critical', 'exception'],
'ipalib.session.MemcacheSessionManager' : ['log', 'debug', 'info', 'warning', 'error', 'critical', 'exception'],
'ipapython.admintool.AdminTool' : ['log', 'debug', 'info', 'warning', 'error', 'critical', 'exception'],
+ 'ipapython.cookie.Cookie' : ['log', 'debug', 'info', 'warning', 'error', 'critical', 'exception'],
}
def _related_classes(self, klass):
diff --git a/tests/test_ipapython/test_cookie.py b/tests/test_ipapython/test_cookie.py
new file mode 100644
index 00000000..f8c5daf4
--- /dev/null
+++ b/tests/test_ipapython/test_cookie.py
@@ -0,0 +1,475 @@
+# Authors:
+# John Dennis <jdennis@redhat.com>
+#
+# Copyright (C) 2012 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import unittest
+import time
+import datetime
+import calendar
+from ipapython.cookie import Cookie
+
+class TestParse(unittest.TestCase):
+
+ def test_parse(self):
+ # Empty string
+ s = ''
+ cookies = Cookie.parse(s)
+ self.assertEqual(len(cookies), 0)
+
+ # Invalid single token
+ s = 'color'
+ with self.assertRaises(ValueError):
+ cookies = Cookie.parse(s)
+
+ # Invalid single token that's keyword
+ s = 'HttpOnly'
+ with self.assertRaises(ValueError):
+ cookies = Cookie.parse(s)
+
+ # Invalid key/value pair whose key is a keyword
+ s = 'domain=example.com'
+ with self.assertRaises(ValueError):
+ cookies = Cookie.parse(s)
+
+ # 1 cookie with name/value
+ s = 'color=blue'
+ cookies = Cookie.parse(s)
+ self.assertEqual(len(cookies), 1)
+ cookie = cookies[0]
+ self.assertEqual(cookie.key, 'color')
+ self.assertEqual(cookie.value, 'blue')
+ self.assertEqual(cookie.domain, None)
+ self.assertEqual(cookie.path, None)
+ self.assertEqual(cookie.max_age, None)
+ self.assertEqual(cookie.expires, None)
+ self.assertEqual(cookie.secure, None)
+ self.assertEqual(cookie.httponly, None)
+ self.assertEqual(str(cookie), "color=blue")
+ self.assertEqual(cookie.http_cookie(), "color=blue;")
+
+ # 1 cookie with whose value is quoted
+ # Use "get by name" utility to extract specific cookie
+ s = 'color="blue"'
+ cookie = Cookie.get_named_cookie_from_string(s, 'color')
+ self.assertIsNotNone(cookie)
+ self.assertIsNotNone(cookie, Cookie)
+ self.assertEqual(cookie.key, 'color')
+ self.assertEqual(cookie.value, 'blue')
+ self.assertEqual(cookie.domain, None)
+ self.assertEqual(cookie.path, None)
+ self.assertEqual(cookie.max_age, None)
+ self.assertEqual(cookie.expires, None)
+ self.assertEqual(cookie.secure, None)
+ self.assertEqual(cookie.httponly, None)
+ self.assertEqual(str(cookie), "color=blue")
+ self.assertEqual(cookie.http_cookie(), "color=blue;")
+
+ # 1 cookie with name/value and domain, path attributes.
+ # Change up the whitespace a bit.
+ s = 'color =blue; domain= example.com ; path = /toplevel '
+ cookies = Cookie.parse(s)
+ self.assertEqual(len(cookies), 1)
+ cookie = cookies[0]
+ self.assertEqual(cookie.key, 'color')
+ self.assertEqual(cookie.value, 'blue')
+ self.assertEqual(cookie.domain, 'example.com')
+ self.assertEqual(cookie.path, '/toplevel')
+ self.assertEqual(cookie.max_age, None)
+ self.assertEqual(cookie.expires, None)
+ self.assertEqual(cookie.secure, None)
+ self.assertEqual(cookie.httponly, None)
+ self.assertEqual(str(cookie), "color=blue; Domain=example.com; Path=/toplevel")
+ self.assertEqual(cookie.http_cookie(), "color=blue;")
+
+ # 2 cookies, various attributes
+ s = 'color=blue; Max-Age=3600; temperature=hot; HttpOnly'
+ cookies = Cookie.parse(s)
+ self.assertEqual(len(cookies), 2)
+ cookie = cookies[0]
+ self.assertEqual(cookie.key, 'color')
+ self.assertEqual(cookie.value, 'blue')
+ self.assertEqual(cookie.domain, None)
+ self.assertEqual(cookie.path, None)
+ self.assertEqual(cookie.max_age, 3600)
+ self.assertEqual(cookie.expires, None)
+ self.assertEqual(cookie.secure, None)
+ self.assertEqual(cookie.httponly, None)
+ self.assertEqual(str(cookie), "color=blue; Max-Age=3600")
+ self.assertEqual(cookie.http_cookie(), "color=blue;")
+ cookie = cookies[1]
+ self.assertEqual(cookie.key, 'temperature')
+ self.assertEqual(cookie.value, 'hot')
+ self.assertEqual(cookie.domain, None)
+ self.assertEqual(cookie.path, None)
+ self.assertEqual(cookie.max_age, None)
+ self.assertEqual(cookie.expires, None)
+ self.assertEqual(cookie.secure, None)
+ self.assertEqual(cookie.httponly, True)
+ self.assertEqual(str(cookie), "temperature=hot; HttpOnly")
+ self.assertEqual(cookie.http_cookie(), "temperature=hot;")
+
+class TestExpires(unittest.TestCase):
+
+ def setUp(self):
+ # Force microseconds to zero because cookie timestamps only have second resolution
+ self.now = datetime.datetime.utcnow().replace(microsecond=0)
+ self.now_timestamp = calendar.timegm(self.now.utctimetuple())
+ self.now_string = datetime.datetime.strftime(self.now, '%a, %d %b %Y %H:%M:%S GMT')
+
+ self.max_age = 3600 # 1 hour
+ self.age_expiration = self.now + datetime.timedelta(seconds=self.max_age)
+ self.age_string = datetime.datetime.strftime(self.age_expiration, '%a, %d %b %Y %H:%M:%S GMT')
+
+ self.expires = self.now + datetime.timedelta(days=1) # 1 day
+ self.expires_timestamp = calendar.timegm(self.expires.utctimetuple())
+ self.expires_string = datetime.datetime.strftime(self.expires, '%a, %d %b %Y %H:%M:%S GMT')
+
+ def test_expires(self):
+ # 1 cookie with name/value and no Max-Age and no Expires
+ s = 'color=blue;'
+ cookies = Cookie.parse(s)
+ self.assertEqual(len(cookies), 1)
+ cookie = cookies[0]
+ # Force timestamp to known value
+ cookie.timestamp = self.now
+ self.assertEqual(cookie.key, 'color')
+ self.assertEqual(cookie.value, 'blue')
+ self.assertEqual(cookie.domain, None)
+ self.assertEqual(cookie.path, None)
+ self.assertEqual(cookie.max_age, None)
+ self.assertEqual(cookie.expires, None)
+ self.assertEqual(cookie.secure, None)
+ self.assertEqual(cookie.httponly, None)
+ self.assertEqual(str(cookie), "color=blue")
+ self.assertEqual(cookie.get_expiration(), None)
+ # Normalize
+ self.assertEqual(cookie.normalize_expiration(), None)
+ self.assertEqual(cookie.max_age, None)
+ self.assertEqual(cookie.expires, None)
+ self.assertEqual(str(cookie), "color=blue")
+
+ # 1 cookie with name/value and Max-Age
+ s = 'color=blue; max-age=%d' % (self.max_age)
+ cookies = Cookie.parse(s)
+ self.assertEqual(len(cookies), 1)
+ cookie = cookies[0]
+ # Force timestamp to known value
+ cookie.timestamp = self.now
+ self.assertEqual(cookie.key, 'color')
+ self.assertEqual(cookie.value, 'blue')
+ self.assertEqual(cookie.domain, None)
+ self.assertEqual(cookie.path, None)
+ self.assertEqual(cookie.max_age, self.max_age)
+ self.assertEqual(cookie.expires, None)
+ self.assertEqual(cookie.secure, None)
+ self.assertEqual(cookie.httponly, None)
+ self.assertEqual(str(cookie), "color=blue; Max-Age=%d" % (self.max_age))
+ self.assertEqual(cookie.get_expiration(), self.age_expiration)
+ # Normalize
+ self.assertEqual(cookie.normalize_expiration(), self.age_expiration)
+ self.assertEqual(cookie.max_age, None)
+ self.assertEqual(cookie.expires, self.age_expiration)
+ self.assertEqual(str(cookie), "color=blue; Expires=%s" % (self.age_string))
+
+
+ # 1 cookie with name/value and Expires
+ s = 'color=blue; Expires=%s' % (self.expires_string)
+ cookies = Cookie.parse(s)
+ self.assertEqual(len(cookies), 1)
+ cookie = cookies[0]
+ # Force timestamp to known value
+ cookie.timestamp = self.now
+ self.assertEqual(cookie.key, 'color')
+ self.assertEqual(cookie.value, 'blue')
+ self.assertEqual(cookie.domain, None)
+ self.assertEqual(cookie.path, None)
+ self.assertEqual(cookie.max_age, None)
+ self.assertEqual(cookie.expires, self.expires)
+ self.assertEqual(cookie.secure, None)
+ self.assertEqual(cookie.httponly, None)
+ self.assertEqual(str(cookie), "color=blue; Expires=%s" % (self.expires_string))
+ self.assertEqual(cookie.get_expiration(), self.expires)
+ # Normalize
+ self.assertEqual(cookie.normalize_expiration(), self.expires)
+ self.assertEqual(cookie.max_age, None)
+ self.assertEqual(cookie.expires, self.expires)
+ self.assertEqual(str(cookie), "color=blue; Expires=%s" % (self.expires_string))
+
+ # 1 cookie with name/value witht both Max-Age and Expires, Max-Age takes precedence
+ s = 'color=blue; Expires=%s; max-age=%d' % (self.expires_string, self.max_age)
+ cookies = Cookie.parse(s)
+ self.assertEqual(len(cookies), 1)
+ cookie = cookies[0]
+ # Force timestamp to known value
+ cookie.timestamp = self.now
+ self.assertEqual(cookie.key, 'color')
+ self.assertEqual(cookie.value, 'blue')
+ self.assertEqual(cookie.domain, None)
+ self.assertEqual(cookie.path, None)
+ self.assertEqual(cookie.max_age, self.max_age)
+ self.assertEqual(cookie.expires, self.expires)
+ self.assertEqual(cookie.secure, None)
+ self.assertEqual(cookie.httponly, None)
+ self.assertEqual(str(cookie), "color=blue; Max-Age=%d; Expires=%s" % (self.max_age, self.expires_string))
+ self.assertEqual(cookie.get_expiration(), self.age_expiration)
+ # Normalize
+ self.assertEqual(cookie.normalize_expiration(), self.age_expiration)
+ self.assertEqual(cookie.max_age, None)
+ self.assertEqual(cookie.expires, self.age_expiration)
+ self.assertEqual(str(cookie), "color=blue; Expires=%s" % (self.age_string))
+
+ # Verify different types can be assigned to the timestamp and
+ # expires attribute.
+
+ cookie = Cookie('color', 'blue')
+ cookie.timestamp = self.now
+ self.assertEqual(cookie.timestamp, self.now)
+ cookie.timestamp = self.now_timestamp
+ self.assertEqual(cookie.timestamp, self.now)
+ cookie.timestamp = self.now_string
+ self.assertEqual(cookie.timestamp, self.now)
+
+ self.assertEqual(cookie.expires, None)
+
+ cookie.expires = self.expires
+ self.assertEqual(cookie.expires, self.expires)
+ cookie.expires = self.expires_timestamp
+ self.assertEqual(cookie.expires, self.expires)
+ cookie.expires = self.expires_string
+ self.assertEqual(cookie.expires, self.expires)
+
+class TestInvalidAttributes(unittest.TestCase):
+ def test_invalid(self):
+ # Invalid Max-Age
+ s = 'color=blue; Max-Age=over-the-hill'
+ with self.assertRaises(ValueError):
+ cookies = Cookie.parse(s)
+
+ cookie = Cookie('color', 'blue')
+ with self.assertRaises(ValueError):
+ cookie.max_age = 'over-the-hill'
+
+ # Invalid Expires
+ s = 'color=blue; Expires=Sun, 06 Xxx 1994 08:49:37 GMT'
+ with self.assertRaises(ValueError):
+ cookies = Cookie.parse(s)
+
+ cookie = Cookie('color', 'blue')
+ with self.assertRaises(ValueError):
+ cookie.expires = 'Sun, 06 Xxx 1994 08:49:37 GMT'
+
+
+class TestAttributes(unittest.TestCase):
+ def test_attributes(self):
+ cookie = Cookie('color', 'blue')
+ self.assertEqual(cookie.key, 'color')
+ self.assertEqual(cookie.value, 'blue')
+ self.assertEqual(cookie.domain, None)
+ self.assertEqual(cookie.path, None)
+ self.assertEqual(cookie.max_age, None)
+ self.assertEqual(cookie.expires, None)
+ self.assertEqual(cookie.secure, None)
+ self.assertEqual(cookie.httponly, None)
+
+ cookie.domain = 'example.com'
+ self.assertEqual(cookie.domain, 'example.com')
+ cookie.domain = None
+ self.assertEqual(cookie.domain, None)
+
+ cookie.path = '/toplevel'
+ self.assertEqual(cookie.path, '/toplevel')
+ cookie.path = None
+ self.assertEqual(cookie.path, None)
+
+ cookie.max_age = 400
+ self.assertEqual(cookie.max_age, 400)
+ cookie.max_age = None
+ self.assertEqual(cookie.max_age, None)
+
+ cookie.expires = 'Sun, 06 Nov 1994 08:49:37 GMT'
+ self.assertEqual(cookie.expires, datetime.datetime(1994, 11, 6, 8, 49, 37))
+ cookie.expires = None
+ self.assertEqual(cookie.expires, None)
+
+ cookie.secure = True
+ self.assertEqual(cookie.secure, True)
+ self.assertEqual(str(cookie), "color=blue; Secure")
+ cookie.secure = False
+ self.assertEqual(cookie.secure, False)
+ self.assertEqual(str(cookie), "color=blue")
+ cookie.secure = None
+ self.assertEqual(cookie.secure, None)
+ self.assertEqual(str(cookie), "color=blue")
+
+ cookie.httponly = True
+ self.assertEqual(cookie.httponly, True)
+ self.assertEqual(str(cookie), "color=blue; HttpOnly")
+ cookie.httponly = False
+ self.assertEqual(cookie.httponly, False)
+ self.assertEqual(str(cookie), "color=blue")
+ cookie.httponly = None
+ self.assertEqual(cookie.httponly, None)
+ self.assertEqual(str(cookie), "color=blue")
+
+
+class TestHTTPReturn(unittest.TestCase):
+ def setUp(self):
+ self.url = 'http://www.foo.bar.com/one/two'
+
+ def test_no_attributes(self):
+ cookie = Cookie('color', 'blue')
+ self.assertTrue(cookie.http_return_ok(self.url))
+
+ def test_domain(self):
+ cookie = Cookie('color', 'blue', domain='www.foo.bar.com')
+ self.assertTrue(cookie.http_return_ok(self.url))
+
+ cookie = Cookie('color', 'blue', domain='.foo.bar.com')
+ self.assertTrue(cookie.http_return_ok(self.url))
+
+ cookie = Cookie('color', 'blue', domain='.bar.com')
+ self.assertTrue(cookie.http_return_ok(self.url))
+
+ cookie = Cookie('color', 'blue', domain='bar.com')
+ with self.assertRaises(Cookie.URLMismatch):
+ self.assertTrue(cookie.http_return_ok(self.url))
+
+ cookie = Cookie('color', 'blue', domain='bogus.com')
+ with self.assertRaises(Cookie.URLMismatch):
+ self.assertTrue(cookie.http_return_ok(self.url))
+
+ cookie = Cookie('color', 'blue', domain='www.foo.bar.com')
+ with self.assertRaises(Cookie.URLMismatch):
+ self.assertTrue(cookie.http_return_ok('http://192.168.1.1/one/two'))
+
+ def test_path(self):
+ cookie = Cookie('color', 'blue')
+ self.assertTrue(cookie.http_return_ok(self.url))
+
+ cookie = Cookie('color', 'blue', path='/')
+ self.assertTrue(cookie.http_return_ok(self.url))
+
+ cookie = Cookie('color', 'blue', path='/one')
+ self.assertTrue(cookie.http_return_ok(self.url))
+
+ cookie = Cookie('color', 'blue', path='/oneX')
+ with self.assertRaises(Cookie.URLMismatch):
+ self.assertTrue(cookie.http_return_ok(self.url))
+
+ def test_expires(self):
+ now = datetime.datetime.utcnow().replace(microsecond=0)
+
+ # expires 1 day from now
+ expires = now + datetime.timedelta(days=1)
+
+ cookie = Cookie('color', 'blue', expires=expires)
+ self.assertTrue(cookie.http_return_ok(self.url))
+
+ # expired 1 day ago
+ expires = now + datetime.timedelta(days=-1)
+ cookie = Cookie('color', 'blue', expires=expires)
+ with self.assertRaises(Cookie.Expired):
+ self.assertTrue(cookie.http_return_ok(self.url))
+
+
+ def test_httponly(self):
+ cookie = Cookie('color', 'blue', httponly=True)
+ self.assertTrue(cookie.http_return_ok('http://example.com'))
+ self.assertTrue(cookie.http_return_ok('https://example.com'))
+
+ with self.assertRaises(Cookie.URLMismatch):
+ self.assertTrue(cookie.http_return_ok('ftp://example.com'))
+
+ def test_secure(self):
+ cookie = Cookie('color', 'blue', secure=True)
+ self.assertTrue(cookie.http_return_ok('https://Xexample.com'))
+
+ with self.assertRaises(Cookie.URLMismatch):
+ self.assertTrue(cookie.http_return_ok('http://Xexample.com'))
+
+class TestNormalization(unittest.TestCase):
+ def setUp(self):
+ # Force microseconds to zero because cookie timestamps only have second resolution
+ self.now = datetime.datetime.utcnow().replace(microsecond=0)
+ self.now_timestamp = calendar.timegm(self.now.utctimetuple())
+ self.now_string = datetime.datetime.strftime(self.now, '%a, %d %b %Y %H:%M:%S GMT')
+
+ self.max_age = 3600 # 1 hour
+ self.age_expiration = self.now + datetime.timedelta(seconds=self.max_age)
+ self.age_string = datetime.datetime.strftime(self.age_expiration, '%a, %d %b %Y %H:%M:%S GMT')
+
+ self.expires = self.now + datetime.timedelta(days=1) # 1 day
+ self.expires_timestamp = calendar.timegm(self.expires.utctimetuple())
+ self.expires_string = datetime.datetime.strftime(self.expires, '%a, %d %b %Y %H:%M:%S GMT')
+
+ def test_path_normalization(self):
+ self.assertEqual(Cookie.normalize_url_path(''), '/')
+ self.assertEqual(Cookie.normalize_url_path('foo'), '/')
+ self.assertEqual(Cookie.normalize_url_path('foo/'), '/')
+ self.assertEqual(Cookie.normalize_url_path('/foo'), '/')
+ self.assertEqual(Cookie.normalize_url_path('/foo/'), '/foo')
+ self.assertEqual(Cookie.normalize_url_path('/Foo/bar'), '/foo')
+ self.assertEqual(Cookie.normalize_url_path('/foo/baR/'), '/foo/bar')
+
+ def test_normalization(self):
+ cookie = Cookie('color', 'blue', expires=self.expires)
+ cookie.timestamp = self.now_timestamp
+
+ self.assertEqual(cookie.domain, None)
+ self.assertEqual(cookie.path, None)
+
+ url = 'http://example.COM/foo'
+ cookie.normalize(url)
+ self.assertEqual(cookie.domain, 'example.com')
+ self.assertEqual(cookie.path, '/')
+ self.assertEqual(cookie.expires, self.expires)
+
+ cookie = Cookie('color', 'blue', max_age=self.max_age)
+ cookie.timestamp = self.now_timestamp
+
+ self.assertEqual(cookie.domain, None)
+ self.assertEqual(cookie.path, None)
+
+ url = 'http://example.com/foo/'
+ cookie.normalize(url)
+ self.assertEqual(cookie.domain, 'example.com')
+ self.assertEqual(cookie.path, '/foo')
+ self.assertEqual(cookie.expires, self.age_expiration)
+
+ cookie = Cookie('color', 'blue')
+ url = 'http://example.com/foo'
+ cookie.normalize(url)
+ self.assertEqual(cookie.domain, 'example.com')
+ self.assertEqual(cookie.path, '/')
+
+ cookie = Cookie('color', 'blue')
+ url = 'http://example.com/foo/bar'
+ cookie.normalize(url)
+ self.assertEqual(cookie.domain, 'example.com')
+ self.assertEqual(cookie.path, '/foo')
+
+ cookie = Cookie('color', 'blue')
+ url = 'http://example.com/foo/bar/'
+ cookie.normalize(url)
+ self.assertEqual(cookie.domain, 'example.com')
+ self.assertEqual(cookie.path, '/foo/bar')
+
+
+#-------------------------------------------------------------------------------
+if __name__ == '__main__':
+ unittest.main()