summaryrefslogtreecommitdiffstats
path: root/keystone/tests
diff options
context:
space:
mode:
Diffstat (limited to 'keystone/tests')
-rw-r--r--keystone/tests/_ldap_livetest.py11
-rw-r--r--keystone/tests/backend_multi_ldap_sql.conf35
-rw-r--r--keystone/tests/core.py16
-rw-r--r--keystone/tests/keystone.Default.conf14
-rw-r--r--keystone/tests/keystone.domain1.conf11
-rw-r--r--keystone/tests/keystone.domain2.conf13
-rw-r--r--keystone/tests/test_backend.py11
-rw-r--r--keystone/tests/test_backend_ldap.py330
-rw-r--r--keystone/tests/test_backend_sql.py2
-rw-r--r--keystone/tests/test_drivers.py5
-rw-r--r--keystone/tests/test_keystoneclient.py55
-rw-r--r--keystone/tests/test_overrides.conf3
-rw-r--r--keystone/tests/test_s3_token_middleware.py4
-rw-r--r--keystone/tests/test_sql_migrate_extensions.py63
-rw-r--r--keystone/tests/test_sql_upgrade.py40
-rw-r--r--keystone/tests/test_v3_auth.py61
-rw-r--r--keystone/tests/test_v3_oauth1.py574
17 files changed, 1204 insertions, 44 deletions
diff --git a/keystone/tests/_ldap_livetest.py b/keystone/tests/_ldap_livetest.py
index 59da4e66..4562ccb6 100644
--- a/keystone/tests/_ldap_livetest.py
+++ b/keystone/tests/_ldap_livetest.py
@@ -87,9 +87,6 @@ class LiveLDAPIdentity(test_backend_ldap.LDAPIdentity):
def tearDown(self):
test.TestCase.tearDown(self)
- def test_user_enable_attribute_mask(self):
- self.skipTest('Test is for Active Directory Only')
-
def test_ldap_dereferencing(self):
alt_users_ldif = {'objectclass': ['top', 'organizationalUnit'],
'ou': 'alt_users'}
@@ -158,3 +155,11 @@ class LiveLDAPIdentity(test_backend_ldap.LDAPIdentity):
alias_dereferencing=deref)
self.assertEqual(ldap.DEREF_SEARCHING,
ldap_wrapper.conn.get_option(ldap.OPT_DEREF))
+
+ def test_user_enable_attribute_mask(self):
+ CONF.ldap.user_enabled_emulation = False
+ CONF.ldap.user_enabled_attribute = 'employeeType'
+ super(LiveLDAPIdentity, self).test_user_enable_attribute_mask()
+
+ def test_create_unicode_user_name(self):
+ self.skipTest('Addressed by bug #1172106')
diff --git a/keystone/tests/backend_multi_ldap_sql.conf b/keystone/tests/backend_multi_ldap_sql.conf
new file mode 100644
index 00000000..59cff761
--- /dev/null
+++ b/keystone/tests/backend_multi_ldap_sql.conf
@@ -0,0 +1,35 @@
+[sql]
+connection = sqlite://
+#For a file based sqlite use
+#connection = sqlite:////tmp/keystone.db
+#To Test MySQL:
+#connection = mysql://keystone:keystone@localhost/keystone?charset=utf8
+#To Test PostgreSQL:
+#connection = postgresql://keystone:keystone@localhost/keystone?client_encoding=utf8
+idle_timeout = 200
+
+[identity]
+# common identity backend is SQL, domain specific configs will
+# set their backends to ldap
+driver = keystone.identity.backends.sql.Identity
+# The test setup will set this to True, to allow easier creation
+# of initial domain data
+# domain_specific_drivers_enabled = True
+
+[assignment]
+driver = keystone.assignment.backends.sql.Assignment
+
+[token]
+driver = keystone.token.backends.sql.Token
+
+[ec2]
+driver = keystone.contrib.ec2.backends.sql.Ec2
+
+[catalog]
+driver = keystone.catalog.backends.sql.Catalog
+
+[policy]
+driver = keystone.policy.backends.sql.Policy
+
+[trust]
+driver = keystone.trust.backends.sql.Trust
diff --git a/keystone/tests/core.py b/keystone/tests/core.py
index 21dc61dc..cba6cbf8 100644
--- a/keystone/tests/core.py
+++ b/keystone/tests/core.py
@@ -40,15 +40,16 @@ from keystone import assignment
from keystone import catalog
from keystone.common import dependency
from keystone.common import kvs
-from keystone.common import logging
from keystone.common import sql
from keystone.common import utils
from keystone.common import wsgi
from keystone import config
from keystone.contrib import ec2
+from keystone.contrib import oauth1
from keystone import credential
from keystone import exception
from keystone import identity
+from keystone.openstack.common import log as logging
from keystone.openstack.common import timeutils
from keystone import policy
from keystone import token
@@ -68,9 +69,6 @@ CONF = config.CONF
cd = os.chdir
-logging.getLogger('routes.middleware').level = logging.WARN
-
-
def rootdir(*p):
return os.path.join(ROOTDIR, *p)
@@ -271,7 +269,7 @@ class TestCase(NoModule, unittest.TestCase):
# assignment manager gets the default assignment driver from the
# identity driver.
for manager in [identity, assignment, catalog, credential, ec2, policy,
- token, token_provider, trust]:
+ token, token_provider, trust, oauth1]:
# manager.__name__ is like keystone.xxx[.yyy],
# converted to xxx[_yyy]
manager_name = ('%s_api' %
@@ -295,9 +293,11 @@ class TestCase(NoModule, unittest.TestCase):
for domain in fixtures.DOMAINS:
try:
rv = self.identity_api.create_domain(domain['id'], domain)
- except (exception.Conflict, exception.NotImplemented):
- pass
- setattr(self, 'domain_%s' % domain['id'], domain)
+ except exception.Conflict:
+ rv = self.identity_api.get_domain(domain['id'])
+ except exception.NotImplemented:
+ rv = domain
+ setattr(self, 'domain_%s' % domain['id'], rv)
for tenant in fixtures.TENANTS:
try:
diff --git a/keystone/tests/keystone.Default.conf b/keystone/tests/keystone.Default.conf
new file mode 100644
index 00000000..7049afed
--- /dev/null
+++ b/keystone/tests/keystone.Default.conf
@@ -0,0 +1,14 @@
+# The domain-specific configuration file for the default domain for
+# use with unit tests.
+#
+# The domain_name of the default domain is 'Default', hence the
+# strange mix of upper/lower case in the file name.
+
+[ldap]
+url = fake://memory
+user = cn=Admin
+password = password
+suffix = cn=example,cn=com
+
+[identity]
+driver = keystone.identity.backends.ldap.Identity \ No newline at end of file
diff --git a/keystone/tests/keystone.domain1.conf b/keystone/tests/keystone.domain1.conf
new file mode 100644
index 00000000..6b7e2488
--- /dev/null
+++ b/keystone/tests/keystone.domain1.conf
@@ -0,0 +1,11 @@
+# The domain-specific configuration file for the test domain
+# 'domain1' for use with unit tests.
+
+[ldap]
+url = fake://memory1
+user = cn=Admin
+password = password
+suffix = cn=example,cn=com
+
+[identity]
+driver = keystone.identity.backends.ldap.Identity \ No newline at end of file
diff --git a/keystone/tests/keystone.domain2.conf b/keystone/tests/keystone.domain2.conf
new file mode 100644
index 00000000..0ed68eb9
--- /dev/null
+++ b/keystone/tests/keystone.domain2.conf
@@ -0,0 +1,13 @@
+# The domain-specific configuration file for the test domain
+# 'domain2' for use with unit tests.
+
+[ldap]
+url = fake://memory
+user = cn=Admin
+password = password
+suffix = cn=myroot,cn=com
+group_tree_dn = ou=UserGroups,dc=myroot,dc=org
+user_tree_dn = ou=Users,dc=myroot,dc=org
+
+[identity]
+driver = keystone.identity.backends.ldap.Identity \ No newline at end of file
diff --git a/keystone/tests/test_backend.py b/keystone/tests/test_backend.py
index 8622b10d..8013deec 100644
--- a/keystone/tests/test_backend.py
+++ b/keystone/tests/test_backend.py
@@ -105,7 +105,9 @@ class IdentityTests(object):
self.assertIn(CONF.member_role_id, role_list)
def test_password_hashed(self):
- user_ref = self.identity_api._get_user(self.user_foo['id'])
+ driver = self.identity_api._select_identity_driver(
+ self.user_foo['domain_id'])
+ user_ref = driver._get_user(self.user_foo['id'])
self.assertNotEqual(user_ref['password'], self.user_foo['password'])
def test_create_unicode_user_name(self):
@@ -1521,7 +1523,8 @@ class IdentityTests(object):
self.assertRaises(exception.UserNotFound,
self.identity_api.update_user,
user_id,
- {'id': user_id})
+ {'id': user_id,
+ 'domain_id': DEFAULT_DOMAIN_ID})
def test_delete_user_with_project_association(self):
user = {'id': uuid.uuid4().hex,
@@ -1628,7 +1631,7 @@ class IdentityTests(object):
tenant)
def test_create_user_long_name_fails(self):
- user = {'id': 'fake1', 'name': 'a' * 65,
+ user = {'id': 'fake1', 'name': 'a' * 256,
'domain_id': DEFAULT_DOMAIN_ID}
self.assertRaises(exception.ValidationError,
self.identity_api.create_user,
@@ -1701,7 +1704,7 @@ class IdentityTests(object):
user = {'id': 'fake1', 'name': 'fake1',
'domain_id': DEFAULT_DOMAIN_ID}
self.identity_api.create_user('fake1', user)
- user['name'] = 'a' * 65
+ user['name'] = 'a' * 256
self.assertRaises(exception.ValidationError,
self.identity_api.update_user,
'fake1',
diff --git a/keystone/tests/test_backend_ldap.py b/keystone/tests/test_backend_ldap.py
index 9c1c98d5..23379712 100644
--- a/keystone/tests/test_backend_ldap.py
+++ b/keystone/tests/test_backend_ldap.py
@@ -17,6 +17,8 @@
import uuid
+import ldap
+
from keystone import assignment
from keystone.common.ldap import fakeldap
from keystone.common import sql
@@ -38,8 +40,16 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
return self.identity_api.get_domain(CONF.identity.default_domain_id)
def clear_database(self):
- db = fakeldap.FakeShelve().get_instance()
- db.clear()
+ for shelf in fakeldap.FakeShelves:
+ fakeldap.FakeShelves[shelf].clear()
+
+ def reload_backends(self, domain_id):
+ # Only one backend unless we are using separate domain backends
+ self.load_backends()
+
+ def get_config(self, domain_id):
+ # Only one conf structure unless we are using separate domain backends
+ return CONF
def _set_config(self):
self.config([test.etcdir('keystone.conf.sample'),
@@ -57,6 +67,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
user = {'id': 'fake1',
'name': 'fake1',
'password': 'fakepass1',
+ 'domain_id': CONF.identity.default_domain_id,
'tenants': ['bar']}
self.identity_api.create_user('fake1', user)
user_ref = self.identity_api.get_user('fake1')
@@ -71,14 +82,16 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
'fake1')
def test_configurable_forbidden_user_actions(self):
- CONF.ldap.user_allow_create = False
- CONF.ldap.user_allow_update = False
- CONF.ldap.user_allow_delete = False
- self.load_backends()
+ conf = self.get_config(CONF.identity.default_domain_id)
+ conf.ldap.user_allow_create = False
+ conf.ldap.user_allow_update = False
+ conf.ldap.user_allow_delete = False
+ self.reload_backends(CONF.identity.default_domain_id)
user = {'id': 'fake1',
'name': 'fake1',
'password': 'fakepass1',
+ 'domain_id': CONF.identity.default_domain_id,
'tenants': ['bar']}
self.assertRaises(exception.ForbiddenAction,
self.identity_api.create_user,
@@ -100,8 +113,9 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
self.user_foo.pop('password')
self.assertDictEqual(user_ref, self.user_foo)
- CONF.ldap.user_filter = '(CN=DOES_NOT_MATCH)'
- self.load_backends()
+ conf = self.get_config(user_ref['domain_id'])
+ conf.ldap.user_filter = '(CN=DOES_NOT_MATCH)'
+ self.reload_backends(user_ref['domain_id'])
self.assertRaises(exception.UserNotFound,
self.identity_api.get_user,
self.user_foo['id'])
@@ -205,18 +219,21 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
# Create a group
group_id = None
- group = dict(name=uuid.uuid4().hex)
+ group = dict(name=uuid.uuid4().hex,
+ domain_id=CONF.identity.default_domain_id)
group_id = self.identity_api.create_group(group_id, group)['id']
# Create a couple of users and add them to the group.
user_id = None
- user = dict(name=uuid.uuid4().hex, id=uuid.uuid4().hex)
+ user = dict(name=uuid.uuid4().hex, id=uuid.uuid4().hex,
+ domain_id=CONF.identity.default_domain_id)
user_1_id = self.identity_api.create_user(user_id, user)['id']
self.identity_api.add_user_to_group(user_1_id, group_id)
user_id = None
- user = dict(name=uuid.uuid4().hex, id=uuid.uuid4().hex)
+ user = dict(name=uuid.uuid4().hex, id=uuid.uuid4().hex,
+ domain_id=CONF.identity.default_domain_id)
user_2_id = self.identity_api.create_user(user_id, user)['id']
self.identity_api.add_user_to_group(user_2_id, group_id)
@@ -224,7 +241,9 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
# Delete user 2
# NOTE(blk-u): need to go directly to user interface to keep from
# updating the group.
- self.identity_api.driver.user.delete(user_2_id)
+ driver = self.identity_api._select_identity_driver(
+ user['domain_id'])
+ driver.user.delete(user_2_id)
# List group users and verify only user 1.
res = self.identity_api.list_users_in_group(group_id)
@@ -249,13 +268,16 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
self.identity_api.create_user(user['id'], user)
self.identity_api.add_user_to_project(self.tenant_baz['id'],
user['id'])
- self.identity_api.driver.user.LDAP_USER = None
- self.identity_api.driver.user.LDAP_PASSWORD = None
+ driver = self.identity_api._select_identity_driver(
+ user['domain_id'])
+ driver.user.LDAP_USER = None
+ driver.user.LDAP_PASSWORD = None
self.assertRaises(AssertionError,
self.identity_api.authenticate,
user_id=user['id'],
- password=None)
+ password=None,
+ domain_scope=user['domain_id'])
# (spzala)The group and domain crud tests below override the standard ones
# in test_backend.py so that we can exclude the update name test, since we
@@ -454,24 +476,56 @@ class LDAPIdentity(test.TestCase, BaseLDAPIdentity):
self.assertNotIn('name', role_ref)
def test_user_enable_attribute_mask(self):
- CONF.ldap.user_enabled_attribute = 'enabled'
CONF.ldap.user_enabled_mask = 2
- CONF.ldap.user_enabled_default = 512
+ CONF.ldap.user_enabled_default = '512'
self.clear_database()
- user = {'id': 'fake1', 'name': 'fake1', 'enabled': True}
- self.identity_api.create_user('fake1', user)
+ self.load_backends()
+ self.load_fixtures(default_fixtures)
+
+ ldap_ = self.identity_api.driver.user.get_connection()
+
+ def get_enabled_vals():
+ user_dn = self.identity_api.driver.user._id_to_dn_string('fake1')
+ enabled_attr_name = CONF.ldap.user_enabled_attribute
+
+ res = ldap_.search_s(user_dn,
+ ldap.SCOPE_BASE,
+ query='(sn=fake1)')
+ return res[0][1][enabled_attr_name]
+
+ user = {'id': 'fake1', 'name': 'fake1', 'enabled': True,
+ 'domain_id': CONF.identity.default_domain_id}
+
+ user_ref = self.identity_api.create_user('fake1', user)
+
+ self.assertEqual(user_ref['enabled'], 512)
+ # TODO(blk-u): 512 seems wrong, should it be True?
+
+ enabled_vals = get_enabled_vals()
+ self.assertEqual(enabled_vals, [512])
+
user_ref = self.identity_api.get_user('fake1')
- self.assertEqual(user_ref['enabled'], True)
+ self.assertIs(user_ref['enabled'], True)
user['enabled'] = False
- self.identity_api.update_user('fake1', user)
+ user_ref = self.identity_api.update_user('fake1', user)
+ self.assertIs(user_ref['enabled'], False)
+
+ enabled_vals = get_enabled_vals()
+ self.assertEqual(enabled_vals, [514])
+
user_ref = self.identity_api.get_user('fake1')
- self.assertEqual(user_ref['enabled'], False)
+ self.assertIs(user_ref['enabled'], False)
user['enabled'] = True
- self.identity_api.update_user('fake1', user)
+ user_ref = self.identity_api.update_user('fake1', user)
+ self.assertIs(user_ref['enabled'], True)
+
+ enabled_vals = get_enabled_vals()
+ self.assertEqual(enabled_vals, [512])
+
user_ref = self.identity_api.get_user('fake1')
- self.assertEqual(user_ref['enabled'], True)
+ self.assertIs(user_ref['enabled'], True)
def test_user_api_get_connection_no_user_password(self):
"""Don't bind in case the user and password are blank."""
@@ -510,6 +564,7 @@ class LDAPIdentity(test.TestCase, BaseLDAPIdentity):
'id': 'extra_attributes',
'name': 'EXTRA_ATTRIBUTES',
'password': 'extra',
+ 'domain_id': CONF.identity.default_domain_id
}
self.identity_api.create_user(user['id'], user)
dn, attrs = self.identity_api.driver.user._ldap_get(user['id'])
@@ -743,3 +798,230 @@ class LdapIdentitySqlAssignment(sql.Base, test.TestCase, BaseLDAPIdentity):
def test_role_filter(self):
self.skipTest(
'N/A: Not part of SQL backend')
+
+
+class MultiLDAPandSQLIdentity(sql.Base, test.TestCase, BaseLDAPIdentity):
+ """Class to test common SQL plus individual LDAP backends.
+
+ We define a set of domains and domain-specific backends:
+
+ - A separate LDAP backend for the default domain
+ - A separate LDAP backend for domain1
+ - domain2 shares the same LDAP as domain1, but uses a different
+ tree attach point
+ - An SQL backend for all other domains (which will include domain3
+ and domain4)
+
+ Normally one would expect that the default domain would be handled as
+ part of the "other domains" - however the above provides better
+ test coverage since most of the existing backend tests use the default
+ domain.
+
+ """
+ def setUp(self):
+ super(MultiLDAPandSQLIdentity, self).setUp()
+
+ self._set_config()
+ self.load_backends()
+ self.engine = self.get_engine()
+ sql.ModelBase.metadata.create_all(bind=self.engine)
+ self._setup_domain_test_data()
+
+ # All initial domain data setup complete, time to switch on support
+ # for separate backends per domain.
+
+ self.orig_config_domains_enabled = (
+ config.CONF.identity.domain_specific_drivers_enabled)
+ self.opt_in_group('identity', domain_specific_drivers_enabled=True)
+ self.orig_config_dir = (
+ config.CONF.identity.domain_config_dir)
+ self.opt_in_group('identity', domain_config_dir=test.TESTSDIR)
+ self._set_domain_configs()
+ self.clear_database()
+ self.load_fixtures(default_fixtures)
+
+ def tearDown(self):
+ super(MultiLDAPandSQLIdentity, self).tearDown()
+ self.opt_in_group(
+ 'identity',
+ domain_config_dir=self.orig_config_dir)
+ self.opt_in_group(
+ 'identity',
+ domain_specific_drivers_enabled=self.orig_config_domains_enabled)
+ sql.ModelBase.metadata.drop_all(bind=self.engine)
+ self.engine.dispose()
+ sql.set_global_engine(None)
+
+ def _set_config(self):
+ self.config([test.etcdir('keystone.conf.sample'),
+ test.testsdir('test_overrides.conf'),
+ test.testsdir('backend_multi_ldap_sql.conf')])
+
+ def _setup_domain_test_data(self):
+
+ def create_domain(domain):
+ try:
+ ref = self.assignment_api.create_domain(
+ domain['id'], domain)
+ except exception.Conflict:
+ ref = (
+ self.assignment_api.get_domain_by_name(domain['name']))
+ return ref
+
+ self.domain_default = create_domain(assignment.DEFAULT_DOMAIN)
+ self.domain1 = create_domain(
+ {'id': uuid.uuid4().hex, 'name': 'domain1'})
+ self.domain2 = create_domain(
+ {'id': uuid.uuid4().hex, 'name': 'domain2'})
+ self.domain3 = create_domain(
+ {'id': uuid.uuid4().hex, 'name': 'domain3'})
+ self.domain4 = create_domain(
+ {'id': uuid.uuid4().hex, 'name': 'domain4'})
+
+ def _set_domain_configs(self):
+ # We need to load the domain configs explicitly to ensure the
+ # test overrides are included.
+ self.identity_api.domain_configs._load_config(
+ self.identity_api.assignment_api,
+ [test.etcdir('keystone.conf.sample'),
+ test.testsdir('test_overrides.conf'),
+ test.testsdir('backend_multi_ldap_sql.conf'),
+ test.testsdir('keystone.Default.conf')],
+ 'Default')
+ self.identity_api.domain_configs._load_config(
+ self.identity_api.assignment_api,
+ [test.etcdir('keystone.conf.sample'),
+ test.testsdir('test_overrides.conf'),
+ test.testsdir('backend_multi_ldap_sql.conf'),
+ test.testsdir('keystone.domain1.conf')],
+ 'domain1')
+ self.identity_api.domain_configs._load_config(
+ self.identity_api.assignment_api,
+ [test.etcdir('keystone.conf.sample'),
+ test.testsdir('test_overrides.conf'),
+ test.testsdir('backend_multi_ldap_sql.conf'),
+ test.testsdir('keystone.domain2.conf')],
+ 'domain2')
+
+ def reload_backends(self, domain_id):
+ # Just reload the driver for this domain - which will pickup
+ # any updated cfg
+ self.identity_api.domain_configs.reload_domain_driver(
+ self.identity_api.assignment_api, domain_id)
+
+ def get_config(self, domain_id):
+ # Get the config for this domain, will return CONF
+ # if no specific config defined for this domain
+ return self.identity_api.domain_configs.get_domain_conf(domain_id)
+
+ def test_list_domains(self):
+ self.skipTest(
+ 'N/A: Not relevant for multi ldap testing')
+
+ def test_domain_segregation(self):
+ """Test that separate configs have segregated the domain.
+
+ Test Plan:
+ - Create a user in each of the domains
+ - Make sure that you can only find a given user in its
+ relevant domain
+ - Make sure that for a backend that supports multiple domains
+ you can get the users via any of the domain scopes
+
+ """
+ def create_user(domain_id):
+ user = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
+ 'domain_id': domain_id,
+ 'password': uuid.uuid4().hex,
+ 'enabled': True}
+ self.identity_api.create_user(user['id'], user)
+ return user
+
+ userd = create_user(CONF.identity.default_domain_id)
+ user1 = create_user(self.domain1['id'])
+ user2 = create_user(self.domain2['id'])
+ user3 = create_user(self.domain3['id'])
+ user4 = create_user(self.domain4['id'])
+
+ # Now check that I can read user1 with the appropriate domain
+ # scope, but won't find it if the wrong scope is used
+
+ ref = self.identity_api.get_user(
+ userd['id'], domain_scope=CONF.identity.default_domain_id)
+ del userd['password']
+ self.assertDictEqual(ref, userd)
+ self.assertRaises(exception.UserNotFound,
+ self.identity_api.get_user,
+ userd['id'],
+ domain_scope=self.domain1['id'])
+ self.assertRaises(exception.UserNotFound,
+ self.identity_api.get_user,
+ userd['id'],
+ domain_scope=self.domain2['id'])
+ self.assertRaises(exception.UserNotFound,
+ self.identity_api.get_user,
+ userd['id'],
+ domain_scope=self.domain3['id'])
+ self.assertRaises(exception.UserNotFound,
+ self.identity_api.get_user,
+ userd['id'],
+ domain_scope=self.domain4['id'])
+
+ ref = self.identity_api.get_user(
+ user1['id'], domain_scope=self.domain1['id'])
+ del user1['password']
+ self.assertDictEqual(ref, user1)
+ ref = self.identity_api.get_user(
+ user2['id'], domain_scope=self.domain2['id'])
+ del user2['password']
+ self.assertDictEqual(ref, user2)
+
+ # Domains 3 and 4 share the same backend, so you should be
+ # able to see user3 and 4 from either
+
+ ref = self.identity_api.get_user(
+ user3['id'], domain_scope=self.domain3['id'])
+ del user3['password']
+ self.assertDictEqual(ref, user3)
+ ref = self.identity_api.get_user(
+ user4['id'], domain_scope=self.domain4['id'])
+ del user4['password']
+ self.assertDictEqual(ref, user4)
+ ref = self.identity_api.get_user(
+ user3['id'], domain_scope=self.domain4['id'])
+ self.assertDictEqual(ref, user3)
+ ref = self.identity_api.get_user(
+ user4['id'], domain_scope=self.domain3['id'])
+ self.assertDictEqual(ref, user4)
+
+ def test_scanning_of_config_dir(self):
+ """Test the Manager class scans the config directory.
+
+ The setup for the main tests above load the domain configs directly
+ so that the test overrides can be included. This test just makes sure
+ that the standard config directory scanning does pick up the relevant
+ domain config files.
+
+ """
+ # Confirm that config has drivers_enabled as True, which we will
+ # check has been set to False later in this test
+ self.assertTrue(config.CONF.identity.domain_specific_drivers_enabled)
+ self.load_backends()
+ # Execute any command to trigger the lazy loading of domain configs
+ self.identity_api.list_users(domain_scope=self.domain1['id'])
+ # ...and now check the domain configs have been set up
+ self.assertIn('default', self.identity_api.domain_configs)
+ self.assertIn(self.domain1['id'], self.identity_api.domain_configs)
+ self.assertIn(self.domain2['id'], self.identity_api.domain_configs)
+ self.assertNotIn(self.domain3['id'], self.identity_api.domain_configs)
+ self.assertNotIn(self.domain4['id'], self.identity_api.domain_configs)
+
+ # Finally check that a domain specific config contains items from both
+ # the primary config and the domain specific config
+ conf = self.identity_api.domain_configs.get_domain_conf(
+ self.domain1['id'])
+ # This should now be false, as is the default, since this is not
+ # set in the standard primary config file
+ self.assertFalse(conf.identity.domain_specific_drivers_enabled)
+ # ..and make sure a domain-specifc options is also set
+ self.assertEqual(conf.ldap.url, 'fake://memory1')
diff --git a/keystone/tests/test_backend_sql.py b/keystone/tests/test_backend_sql.py
index 773ae862..24159eb6 100644
--- a/keystone/tests/test_backend_sql.py
+++ b/keystone/tests/test_backend_sql.py
@@ -81,7 +81,7 @@ class SqlModels(SqlTests):
def test_user_model(self):
cols = (('id', sql.String, 64),
- ('name', sql.String, 64),
+ ('name', sql.String, 255),
('password', sql.String, 128),
('domain_id', sql.String, 64),
('enabled', sql.Boolean, None),
diff --git a/keystone/tests/test_drivers.py b/keystone/tests/test_drivers.py
index c83c1a89..888b365c 100644
--- a/keystone/tests/test_drivers.py
+++ b/keystone/tests/test_drivers.py
@@ -3,6 +3,7 @@ import unittest2 as unittest
from keystone import assignment
from keystone import catalog
+from keystone.contrib import oauth1
from keystone import exception
from keystone import identity
from keystone import policy
@@ -55,3 +56,7 @@ class TestDrivers(unittest.TestCase):
def test_token_driver_unimplemented(self):
interface = token.Driver()
self.assertInterfaceNotImplemented(interface)
+
+ def test_oauth1_driver_unimplemented(self):
+ interface = oauth1.Driver()
+ self.assertInterfaceNotImplemented(interface)
diff --git a/keystone/tests/test_keystoneclient.py b/keystone/tests/test_keystoneclient.py
index 7e59885d..ff2462f4 100644
--- a/keystone/tests/test_keystoneclient.py
+++ b/keystone/tests/test_keystoneclient.py
@@ -378,6 +378,46 @@ class KeystoneClientTests(object):
client.tokens.authenticate,
token=token_id)
+ def test_disable_tenant_invalidates_token(self):
+ from keystoneclient import exceptions as client_exceptions
+
+ admin_client = self.get_client(admin=True)
+ foo_client = self.get_client(self.user_foo)
+ tenant_bar = admin_client.tenants.get(self.tenant_bar['id'])
+
+ # Disable the tenant.
+ tenant_bar.update(enabled=False)
+
+ # Test that the token has been removed.
+ self.assertRaises(client_exceptions.Unauthorized,
+ foo_client.tokens.authenticate,
+ token=foo_client.auth_token)
+
+ # Test that the user access has been disabled.
+ self.assertRaises(client_exceptions.Unauthorized,
+ self.get_client,
+ self.user_foo)
+
+ def test_delete_tenant_invalidates_token(self):
+ from keystoneclient import exceptions as client_exceptions
+
+ admin_client = self.get_client(admin=True)
+ foo_client = self.get_client(self.user_foo)
+ tenant_bar = admin_client.tenants.get(self.tenant_bar['id'])
+
+ # Delete the tenant.
+ tenant_bar.delete()
+
+ # Test that the token has been removed.
+ self.assertRaises(client_exceptions.Unauthorized,
+ foo_client.tokens.authenticate,
+ token=foo_client.auth_token)
+
+ # Test that the user access has been disabled.
+ self.assertRaises(client_exceptions.Unauthorized,
+ self.get_client,
+ self.user_foo)
+
def test_disable_user_invalidates_token(self):
from keystoneclient import exceptions as client_exceptions
@@ -495,6 +535,15 @@ class KeystoneClientTests(object):
user = client.users.update_tenant(
user=user, tenant=self.tenant_bar['id'])
+ def test_user_create_no_string_password(self):
+ from keystoneclient import exceptions as client_exceptions
+ client = self.get_client(admin=True)
+ self.assertRaises(client_exceptions.BadRequest,
+ client.users.create,
+ name='test_user',
+ password=12345,
+ email=uuid.uuid4().hex)
+
def test_user_create_no_name(self):
from keystoneclient import exceptions as client_exceptions
client = self.get_client(admin=True)
@@ -1165,6 +1214,12 @@ class KcEssex3TestCase(CompatTestCase, KeystoneClientTests):
def test_policy_crud(self):
self.skipTest('N/A due to lack of endpoint CRUD')
+ def test_disable_tenant_invalidates_token(self):
+ self.skipTest('N/A')
+
+ def test_delete_tenant_invalidates_token(self):
+ self.skipTest('N/A')
+
class Kc11TestCase(CompatTestCase, KeystoneClientTests):
def get_checkout(self):
diff --git a/keystone/tests/test_overrides.conf b/keystone/tests/test_overrides.conf
index aac29f26..5cd522b2 100644
--- a/keystone/tests/test_overrides.conf
+++ b/keystone/tests/test_overrides.conf
@@ -14,6 +14,9 @@ driver = keystone.trust.backends.kvs.Trust
[token]
driver = keystone.token.backends.kvs.Token
+[oauth1]
+driver = keystone.contrib.oauth1.backends.kvs.OAuth1
+
[signing]
certfile = ../../examples/pki/certs/signing_cert.pem
keyfile = ../../examples/pki/private/signing_key.pem
diff --git a/keystone/tests/test_s3_token_middleware.py b/keystone/tests/test_s3_token_middleware.py
index ec31f2ac..2d561c10 100644
--- a/keystone/tests/test_s3_token_middleware.py
+++ b/keystone/tests/test_s3_token_middleware.py
@@ -225,9 +225,9 @@ class S3TokenMiddlewareTestUtil(unittest.TestCase):
def test_split_path_invalid_path(self):
try:
s3_token.split_path('o\nn e', 2)
- except ValueError, err:
+ except ValueError as err:
self.assertEquals(str(err), 'Invalid path: o%0An%20e')
try:
s3_token.split_path('o\nn e', 2, 3, True)
- except ValueError, err:
+ except ValueError as err:
self.assertEquals(str(err), 'Invalid path: o%0An%20e')
diff --git a/keystone/tests/test_sql_migrate_extensions.py b/keystone/tests/test_sql_migrate_extensions.py
index 4a529559..f9393cbe 100644
--- a/keystone/tests/test_sql_migrate_extensions.py
+++ b/keystone/tests/test_sql_migrate_extensions.py
@@ -27,6 +27,7 @@ To run these tests against a live database:
"""
from keystone.contrib import example
+from keystone.contrib import oauth1
import test_sql_upgrade
@@ -45,3 +46,65 @@ class SqlUpgradeExampleExtension(test_sql_upgrade.SqlMigrateBase):
self.assertTableColumns('example', ['id', 'type', 'extra'])
self.downgrade(0, repository=self.repo_path)
self.assertTableDoesNotExist('example')
+
+
+class SqlUpgradeOAuth1Extension(test_sql_upgrade.SqlMigrateBase):
+ def repo_package(self):
+ return oauth1
+
+ def test_upgrade(self):
+ self.assertTableDoesNotExist('consumer')
+ self.assertTableDoesNotExist('request_token')
+ self.assertTableDoesNotExist('access_token')
+ self.upgrade(1, repository=self.repo_path)
+ self.assertTableColumns('consumer',
+ ['id',
+ 'description',
+ 'secret',
+ 'extra'])
+ self.assertTableColumns('request_token',
+ ['id',
+ 'request_secret',
+ 'verifier',
+ 'authorizing_user_id',
+ 'requested_project_id',
+ 'requested_roles',
+ 'consumer_id',
+ 'expires_at'])
+ self.assertTableColumns('access_token',
+ ['id',
+ 'access_secret',
+ 'authorizing_user_id',
+ 'project_id',
+ 'requested_roles',
+ 'consumer_id',
+ 'expires_at'])
+
+ def test_downgrade(self):
+ self.upgrade(1, repository=self.repo_path)
+ self.assertTableColumns('consumer',
+ ['id',
+ 'description',
+ 'secret',
+ 'extra'])
+ self.assertTableColumns('request_token',
+ ['id',
+ 'request_secret',
+ 'verifier',
+ 'authorizing_user_id',
+ 'requested_project_id',
+ 'requested_roles',
+ 'consumer_id',
+ 'expires_at'])
+ self.assertTableColumns('access_token',
+ ['id',
+ 'access_secret',
+ 'authorizing_user_id',
+ 'project_id',
+ 'requested_roles',
+ 'consumer_id',
+ 'expires_at'])
+ self.downgrade(0, repository=self.repo_path)
+ self.assertTableDoesNotExist('consumer')
+ self.assertTableDoesNotExist('request_token')
+ self.assertTableDoesNotExist('access_token')
diff --git a/keystone/tests/test_sql_upgrade.py b/keystone/tests/test_sql_upgrade.py
index e904d6a7..0ee63433 100644
--- a/keystone/tests/test_sql_upgrade.py
+++ b/keystone/tests/test_sql_upgrade.py
@@ -556,6 +556,42 @@ class SqlUpgradeTests(SqlMigrateBase):
insert.execute(d)
session.commit()
+ def test_upgrade_31_to_32(self):
+ self.upgrade(32)
+
+ user_table = self.select_table("user")
+ self.assertEquals(user_table.c.name.type.length, 255)
+
+ def test_downgrade_32_to_31(self):
+ self.upgrade(32)
+ session = self.Session()
+ # NOTE(aloga): we need a different metadata object
+ user_table = sqlalchemy.Table('user',
+ sqlalchemy.MetaData(),
+ autoload=True,
+ autoload_with=self.engine)
+ user_id = uuid.uuid4().hex
+ ins = user_table.insert().values(
+ {'id': user_id,
+ 'name': 'a' * 255,
+ 'password': uuid.uuid4().hex,
+ 'enabled': True,
+ 'domain_id': DEFAULT_DOMAIN_ID,
+ 'extra': '{}'})
+ session.execute(ins)
+ session.commit()
+
+ self.downgrade(31)
+ # Check that username has been truncated
+ q = session.query(user_table.c.name)
+ q = q.filter(user_table.c.id == user_id)
+ r = q.one()
+ user_name = r[0]
+ self.assertEquals(len(user_name), 64)
+
+ user_table = self.select_table("user")
+ self.assertEquals(user_table.c.name.type.length, 64)
+
def test_downgrade_to_0(self):
self.upgrade(self.max_version)
@@ -1362,7 +1398,7 @@ class SqlUpgradeTests(SqlMigrateBase):
total = connection.execute("SELECT count(*) "
"from information_schema.TABLES "
"where TABLE_SCHEMA='%(database)s'" %
- locals())
+ dict(database=database))
self.assertTrue(total.scalar() > 0, "No tables found. Wrong schema?")
noninnodb = connection.execute("SELECT table_name "
@@ -1370,7 +1406,7 @@ class SqlUpgradeTests(SqlMigrateBase):
"where TABLE_SCHEMA='%(database)s' "
"and ENGINE!='InnoDB' "
"and TABLE_NAME!='migrate_version'" %
- locals())
+ dict(database=database))
names = [x[0] for x in noninnodb]
self.assertEqual(names, [],
"Non-InnoDB tables exist")
diff --git a/keystone/tests/test_v3_auth.py b/keystone/tests/test_v3_auth.py
index 43f87d98..1f4425ce 100644
--- a/keystone/tests/test_v3_auth.py
+++ b/keystone/tests/test_v3_auth.py
@@ -545,6 +545,67 @@ class TestTokenRevoking(test_v3.RestfulTestCase):
headers={'X-Subject-Token': token},
expected_status=204)
+ def test_disabling_project_revokes_token(self):
+ resp = self.post(
+ '/auth/tokens',
+ body=self.build_authentication_request(
+ user_id=self.user3['id'],
+ password=self.user3['password'],
+ project_id=self.projectA['id']))
+ token = resp.headers.get('X-Subject-Token')
+
+ # confirm token is valid
+ self.head('/auth/tokens',
+ headers={'X-Subject-Token': token},
+ expected_status=204)
+
+ # disable the project, which should invalidate the token
+ self.patch(
+ '/projects/%(project_id)s' % {'project_id': self.projectA['id']},
+ body={'project': {'enabled': False}})
+
+ # user should no longer have access to the project
+ self.head('/auth/tokens',
+ headers={'X-Subject-Token': token},
+ expected_status=401)
+ resp = self.post(
+ '/auth/tokens',
+ body=self.build_authentication_request(
+ user_id=self.user3['id'],
+ password=self.user3['password'],
+ project_id=self.projectA['id']),
+ expected_status=401)
+
+ def test_deleting_project_revokes_token(self):
+ resp = self.post(
+ '/auth/tokens',
+ body=self.build_authentication_request(
+ user_id=self.user3['id'],
+ password=self.user3['password'],
+ project_id=self.projectA['id']))
+ token = resp.headers.get('X-Subject-Token')
+
+ # confirm token is valid
+ self.head('/auth/tokens',
+ headers={'X-Subject-Token': token},
+ expected_status=204)
+
+ # delete the project, which should invalidate the token
+ self.delete(
+ '/projects/%(project_id)s' % {'project_id': self.projectA['id']})
+
+ # user should no longer have access to the project
+ self.head('/auth/tokens',
+ headers={'X-Subject-Token': token},
+ expected_status=401)
+ resp = self.post(
+ '/auth/tokens',
+ body=self.build_authentication_request(
+ user_id=self.user3['id'],
+ password=self.user3['password'],
+ project_id=self.projectA['id']),
+ expected_status=401)
+
def test_deleting_group_grant_revokes_tokens(self):
"""Test deleting a group grant revokes tokens.
diff --git a/keystone/tests/test_v3_oauth1.py b/keystone/tests/test_v3_oauth1.py
new file mode 100644
index 00000000..a0ae5fc6
--- /dev/null
+++ b/keystone/tests/test_v3_oauth1.py
@@ -0,0 +1,574 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import copy
+import os
+import urlparse
+import uuid
+
+import webtest
+
+from keystone.common import cms
+from keystone import config
+from keystone.contrib import oauth1
+from keystone.contrib.oauth1 import controllers
+from keystone.tests import core
+
+import test_v3
+
+
+OAUTH_PASTE_FILE = 'v3_oauth1-paste.ini'
+CONF = config.CONF
+
+
+class OAuth1Tests(test_v3.RestfulTestCase):
+ def setUp(self):
+ super(OAuth1Tests, self).setUp()
+ self.controller = controllers.OAuthControllerV3()
+ self.base_url = CONF.public_endpoint % CONF + "v3"
+ self._generate_paste_config()
+ self.load_backends()
+ self.admin_app = webtest.TestApp(
+ self.loadapp('v3_oauth1', name='admin'))
+ self.public_app = webtest.TestApp(
+ self.loadapp('v3_oauth1', name='admin'))
+
+ def tearDown(self):
+ os.remove(OAUTH_PASTE_FILE)
+
+ def _generate_paste_config(self):
+ # Generate a file, based on keystone-paste.ini,
+ # that includes oauth_extension in the pipeline
+ old_pipeline = " ec2_extension "
+ new_pipeline = " oauth_extension ec2_extension "
+
+ with open(core.etcdir('keystone-paste.ini'), 'r') as f:
+ contents = f.read()
+ new_contents = contents.replace(old_pipeline, new_pipeline)
+ with open(OAUTH_PASTE_FILE, 'w') as f:
+ f.write(new_contents)
+
+ def _create_single_consumer(self):
+ ref = {'description': uuid.uuid4().hex}
+ resp = self.post(
+ '/OS-OAUTH1/consumers',
+ body={'consumer': ref})
+ return resp.result.get('consumer')
+
+ def _oauth_request(self, consumer, token=None, **kw):
+ return oauth1.Request.from_consumer_and_token(consumer=consumer,
+ token=token,
+ **kw)
+
+ def _create_request_token(self, consumer, role, project_id):
+ params = {'requested_role_ids': role,
+ 'requested_project_id': project_id}
+ headers = {'Content-Type': 'application/json'}
+ url = '/OS-OAUTH1/request_token'
+ oreq = self._oauth_request(
+ consumer=consumer,
+ http_url=self.base_url + url,
+ http_method='POST',
+ parameters=params)
+
+ hmac = oauth1.SignatureMethod_HMAC_SHA1()
+ oreq.sign_request(hmac, consumer, None)
+ headers.update(oreq.to_header())
+ headers.update(params)
+ return url, headers
+
+ def _create_access_token(self, consumer, token):
+ headers = {'Content-Type': 'application/json'}
+ url = '/OS-OAUTH1/access_token'
+ oreq = self._oauth_request(
+ consumer=consumer, token=token,
+ http_method='POST',
+ http_url=self.base_url + url)
+ hmac = oauth1.SignatureMethod_HMAC_SHA1()
+ oreq.sign_request(hmac, consumer, token)
+ headers.update(oreq.to_header())
+ return url, headers
+
+ def _get_oauth_token(self, consumer, token):
+ headers = {'Content-Type': 'application/json'}
+ body = {'auth': {'identity': {'methods': ['oauth1'], 'oauth1': {}}}}
+ url = '/auth/tokens'
+ oreq = self._oauth_request(
+ consumer=consumer, token=token,
+ http_method='POST',
+ http_url=self.base_url + url)
+ hmac = oauth1.SignatureMethod_HMAC_SHA1()
+ oreq.sign_request(hmac, consumer, token)
+ headers.update(oreq.to_header())
+ return url, headers, body
+
+ def _authorize_request_token(self, request_id):
+ return '/OS-OAUTH1/authorize/%s' % (request_id)
+
+
+class ConsumerCRUDTests(OAuth1Tests):
+
+ def test_consumer_create(self):
+ description = uuid.uuid4().hex
+ ref = {'description': description}
+ resp = self.post(
+ '/OS-OAUTH1/consumers',
+ body={'consumer': ref})
+ consumer = resp.result.get('consumer')
+ consumer_id = consumer.get('id')
+ self.assertEqual(consumer.get('description'), description)
+ self.assertIsNotNone(consumer_id)
+ self.assertIsNotNone(consumer.get('secret'))
+
+ def test_consumer_delete(self):
+ consumer = self._create_single_consumer()
+ consumer_id = consumer.get('id')
+ resp = self.delete('/OS-OAUTH1/consumers/%(consumer_id)s'
+ % {'consumer_id': consumer_id})
+ self.assertResponseStatus(resp, 204)
+
+ def test_consumer_get(self):
+ consumer = self._create_single_consumer()
+ consumer_id = consumer.get('id')
+ resp = self.get('/OS-OAUTH1/consumers/%(consumer_id)s'
+ % {'consumer_id': consumer_id})
+ self.assertTrue(resp.result.get('consumer').get('id'), consumer_id)
+
+ def test_consumer_list(self):
+ resp = self.get('/OS-OAUTH1/consumers')
+ entities = resp.result.get('consumers')
+ self.assertIsNotNone(entities)
+ self.assertValidListLinks(resp.result.get('links'))
+
+ def test_consumer_update(self):
+ consumer = self._create_single_consumer()
+ original_id = consumer.get('id')
+ original_description = consumer.get('description')
+ original_secret = consumer.get('secret')
+ update_description = original_description + "_new"
+
+ update_ref = {'description': update_description}
+ update_resp = self.patch('/OS-OAUTH1/consumers/%(consumer_id)s'
+ % {'consumer_id': original_id},
+ body={'consumer': update_ref})
+ consumer = update_resp.result.get('consumer')
+ self.assertEqual(consumer.get('description'), update_description)
+ self.assertEqual(consumer.get('id'), original_id)
+ self.assertEqual(consumer.get('secret'), original_secret)
+
+ def test_consumer_update_bad_secret(self):
+ consumer = self._create_single_consumer()
+ original_id = consumer.get('id')
+ update_ref = copy.deepcopy(consumer)
+ update_ref['description'] = uuid.uuid4().hex
+ update_ref['secret'] = uuid.uuid4().hex
+ self.patch('/OS-OAUTH1/consumers/%(consumer_id)s'
+ % {'consumer_id': original_id},
+ body={'consumer': update_ref},
+ expected_status=400)
+
+ def test_consumer_update_bad_id(self):
+ consumer = self._create_single_consumer()
+ original_id = consumer.get('id')
+ original_description = consumer.get('description')
+ update_description = original_description + "_new"
+
+ update_ref = copy.deepcopy(consumer)
+ update_ref['description'] = update_description
+ update_ref['id'] = update_description
+ self.patch('/OS-OAUTH1/consumers/%(consumer_id)s'
+ % {'consumer_id': original_id},
+ body={'consumer': update_ref},
+ expected_status=400)
+
+ def test_consumer_create_no_description(self):
+ resp = self.post('/OS-OAUTH1/consumers', body={'consumer': {}})
+ consumer = resp.result.get('consumer')
+ consumer_id = consumer.get('id')
+ self.assertEqual(consumer.get('description'), None)
+ self.assertIsNotNone(consumer_id)
+ self.assertIsNotNone(consumer.get('secret'))
+
+ def test_consumer_get_bad_id(self):
+ self.get('/OS-OAUTH1/consumers/%(consumer_id)s'
+ % {'consumer_id': uuid.uuid4().hex},
+ expected_status=404)
+
+
+class OAuthFlowTests(OAuth1Tests):
+
+ def test_oauth_flow(self):
+ consumer = self._create_single_consumer()
+ consumer_id = consumer.get('id')
+ consumer_secret = consumer.get('secret')
+ self.consumer = oauth1.Consumer(consumer_id, consumer_secret)
+ self.assertIsNotNone(self.consumer.key)
+
+ url, headers = self._create_request_token(self.consumer,
+ self.role_id,
+ self.project_id)
+ content = self.post(url, headers=headers)
+ credentials = urlparse.parse_qs(content.result)
+ request_key = credentials.get('oauth_token')[0]
+ request_secret = credentials.get('oauth_token_secret')[0]
+ self.request_token = oauth1.Token(request_key, request_secret)
+ self.assertIsNotNone(self.request_token.key)
+
+ url = self._authorize_request_token(request_key)
+ resp = self.put(url, expected_status=200)
+ self.verifier = resp.result['token']['oauth_verifier']
+
+ self.request_token.set_verifier(self.verifier)
+ url, headers = self._create_access_token(self.consumer,
+ self.request_token)
+ content = self.post(url, headers=headers)
+ credentials = urlparse.parse_qs(content.result)
+ access_key = credentials.get('oauth_token')[0]
+ access_secret = credentials.get('oauth_token_secret')[0]
+ self.access_token = oauth1.Token(access_key, access_secret)
+ self.assertIsNotNone(self.access_token.key)
+
+ url, headers, body = self._get_oauth_token(self.consumer,
+ self.access_token)
+ content = self.post(url, headers=headers, body=body)
+ self.keystone_token_id = content.headers.get('X-Subject-Token')
+ self.keystone_token = content.result.get('token')
+ self.assertIsNotNone(self.keystone_token_id)
+
+
+class AccessTokenCRUDTests(OAuthFlowTests):
+ def test_delete_access_token_dne(self):
+ self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s'
+ % {'user': self.user_id,
+ 'auth': uuid.uuid4().hex},
+ expected_status=404)
+
+ def test_list_no_access_tokens(self):
+ resp = self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens'
+ % {'user_id': self.user_id})
+ entities = resp.result.get('access_tokens')
+ self.assertTrue(len(entities) == 0)
+ self.assertValidListLinks(resp.result.get('links'))
+
+ def test_get_single_access_token(self):
+ self.test_oauth_flow()
+ resp = self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens/%(key)s'
+ % {'user_id': self.user_id,
+ 'key': self.access_token.key})
+ entity = resp.result.get('access_token')
+ self.assertTrue(entity['id'], self.access_token.key)
+ self.assertTrue(entity['consumer_id'], self.consumer.key)
+
+ def test_get_access_token_dne(self):
+ self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens/%(key)s'
+ % {'user_id': self.user_id,
+ 'key': uuid.uuid4().hex},
+ expected_status=404)
+
+ def test_list_all_roles_in_access_token(self):
+ self.test_oauth_flow()
+ resp = self.get('/users/%(id)s/OS-OAUTH1/access_tokens/%(key)s/roles'
+ % {'id': self.user_id,
+ 'key': self.access_token.key})
+ entities = resp.result.get('roles')
+ self.assertTrue(len(entities) > 0)
+ self.assertValidListLinks(resp.result.get('links'))
+
+ def test_get_role_in_access_token(self):
+ self.test_oauth_flow()
+ url = ('/users/%(id)s/OS-OAUTH1/access_tokens/%(key)s/roles/%(role)s'
+ % {'id': self.user_id, 'key': self.access_token.key,
+ 'role': self.role_id})
+ resp = self.get(url)
+ entity = resp.result.get('role')
+ self.assertTrue(entity['id'], self.role_id)
+
+ def test_get_role_in_access_token_dne(self):
+ self.test_oauth_flow()
+ url = ('/users/%(id)s/OS-OAUTH1/access_tokens/%(key)s/roles/%(role)s'
+ % {'id': self.user_id, 'key': self.access_token.key,
+ 'role': uuid.uuid4().hex})
+ self.get(url, expected_status=404)
+
+ def test_list_and_delete_access_tokens(self):
+ self.test_oauth_flow()
+ # List access_tokens should be > 0
+ resp = self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens'
+ % {'user_id': self.user_id})
+ entities = resp.result.get('access_tokens')
+ self.assertTrue(len(entities) > 0)
+ self.assertValidListLinks(resp.result.get('links'))
+
+ # Delete access_token
+ resp = self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s'
+ % {'user': self.user_id,
+ 'auth': self.access_token.key})
+ self.assertResponseStatus(resp, 204)
+
+ # List access_token should be 0
+ resp = self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens'
+ % {'user_id': self.user_id})
+ entities = resp.result.get('access_tokens')
+ self.assertTrue(len(entities) == 0)
+ self.assertValidListLinks(resp.result.get('links'))
+
+
+class AuthTokenTests(OAuthFlowTests):
+
+ def test_keystone_token_is_valid(self):
+ self.test_oauth_flow()
+ headers = {'X-Subject-Token': self.keystone_token_id,
+ 'X-Auth-Token': self.keystone_token_id}
+ r = self.get('/auth/tokens', headers=headers)
+ self.assertValidTokenResponse(r, self.user)
+
+ # now verify the oauth section
+ oauth_section = r.result['token']['OS-OAUTH1']
+ self.assertEquals(oauth_section['access_token_id'],
+ self.access_token.key)
+ self.assertEquals(oauth_section['consumer_id'], self.consumer.key)
+
+ def test_delete_access_token_also_revokes_token(self):
+ self.test_oauth_flow()
+
+ # Delete access token
+ resp = self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s'
+ % {'user': self.user_id,
+ 'auth': self.access_token.key})
+ self.assertResponseStatus(resp, 204)
+
+ # Check Keystone Token no longer exists
+ headers = {'X-Subject-Token': self.keystone_token_id,
+ 'X-Auth-Token': self.keystone_token_id}
+ self.get('/auth/tokens', headers=headers,
+ expected_status=401)
+
+ def test_deleting_consumer_also_deletes_tokens(self):
+ self.test_oauth_flow()
+
+ # Delete consumer
+ consumer_id = self.consumer.key
+ resp = self.delete('/OS-OAUTH1/consumers/%(consumer_id)s'
+ % {'consumer_id': consumer_id})
+ self.assertResponseStatus(resp, 204)
+
+ # List access_token should be 0
+ resp = self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens'
+ % {'user_id': self.user_id})
+ entities = resp.result.get('access_tokens')
+ self.assertEqual(len(entities), 0)
+
+ # Check Keystone Token no longer exists
+ headers = {'X-Subject-Token': self.keystone_token_id,
+ 'X-Auth-Token': self.keystone_token_id}
+ self.head('/auth/tokens', headers=headers,
+ expected_status=401)
+
+ def test_change_user_password_also_deletes_tokens(self):
+ self.test_oauth_flow()
+
+ # delegated keystone token exists
+ headers = {'X-Subject-Token': self.keystone_token_id,
+ 'X-Auth-Token': self.keystone_token_id}
+ r = self.get('/auth/tokens', headers=headers)
+ self.assertValidTokenResponse(r, self.user)
+
+ user = {'password': uuid.uuid4().hex}
+ r = self.patch('/users/%(user_id)s' % {
+ 'user_id': self.user['id']},
+ body={'user': user})
+
+ headers = {'X-Subject-Token': self.keystone_token_id,
+ 'X-Auth-Token': self.keystone_token_id}
+ self.admin_request(path='/auth/tokens', headers=headers,
+ method='GET', expected_status=404)
+
+ def test_deleting_project_also_invalidates_tokens(self):
+ self.test_oauth_flow()
+
+ # delegated keystone token exists
+ headers = {'X-Subject-Token': self.keystone_token_id,
+ 'X-Auth-Token': self.keystone_token_id}
+ r = self.get('/auth/tokens', headers=headers)
+ self.assertValidTokenResponse(r, self.user)
+
+ r = self.delete('/projects/%(project_id)s' % {
+ 'project_id': self.project_id})
+
+ headers = {'X-Subject-Token': self.keystone_token_id,
+ 'X-Auth-Token': self.keystone_token_id}
+ self.admin_request(path='/auth/tokens', headers=headers,
+ method='GET', expected_status=404)
+
+ def test_token_chaining_is_not_allowed(self):
+ self.test_oauth_flow()
+
+ #attempt to re-authenticate (token chain) with the given token
+ path = '/v3/auth/tokens/'
+ auth_data = self.build_authentication_request(
+ token=self.keystone_token_id)
+
+ self.admin_request(
+ path=path,
+ body=auth_data,
+ token=self.keystone_token_id,
+ method='POST',
+ expected_status=403)
+
+ def test_list_keystone_tokens_by_consumer(self):
+ self.test_oauth_flow()
+ tokens = self.token_api.list_tokens(self.user_id,
+ consumer_id=self.consumer.key)
+ keystone_token_uuid = cms.cms_hash_token(self.keystone_token_id)
+ self.assertTrue(len(tokens) > 0)
+ self.assertTrue(keystone_token_uuid in tokens)
+
+
+class MaliciousOAuth1Tests(OAuth1Tests):
+
+ def test_bad_consumer_secret(self):
+ consumer = self._create_single_consumer()
+ consumer_id = consumer.get('id')
+ consumer = oauth1.Consumer(consumer_id, "bad_secret")
+ url, headers = self._create_request_token(consumer,
+ self.role_id,
+ self.project_id)
+ self.post(url, headers=headers, expected_status=500)
+
+ def test_bad_request_token_key(self):
+ consumer = self._create_single_consumer()
+ consumer_id = consumer.get('id')
+ consumer_secret = consumer.get('secret')
+ consumer = oauth1.Consumer(consumer_id, consumer_secret)
+ url, headers = self._create_request_token(consumer,
+ self.role_id,
+ self.project_id)
+ self.post(url, headers=headers)
+ url = self._authorize_request_token("bad_key")
+ self.put(url, expected_status=404)
+
+ def test_bad_verifier(self):
+ consumer = self._create_single_consumer()
+ consumer_id = consumer.get('id')
+ consumer_secret = consumer.get('secret')
+ consumer = oauth1.Consumer(consumer_id, consumer_secret)
+
+ url, headers = self._create_request_token(consumer,
+ self.role_id,
+ self.project_id)
+ content = self.post(url, headers=headers)
+ credentials = urlparse.parse_qs(content.result)
+ request_key = credentials.get('oauth_token')[0]
+ request_secret = credentials.get('oauth_token_secret')[0]
+ request_token = oauth1.Token(request_key, request_secret)
+
+ url = self._authorize_request_token(request_key)
+ resp = self.put(url, expected_status=200)
+ verifier = resp.result['token']['oauth_verifier']
+ self.assertIsNotNone(verifier)
+
+ request_token.set_verifier("bad verifier")
+ url, headers = self._create_access_token(consumer,
+ request_token)
+ self.post(url, headers=headers, expected_status=401)
+
+ def test_bad_requested_roles(self):
+ consumer = self._create_single_consumer()
+ consumer_id = consumer.get('id')
+ consumer_secret = consumer.get('secret')
+ consumer = oauth1.Consumer(consumer_id, consumer_secret)
+
+ url, headers = self._create_request_token(consumer,
+ "bad_role",
+ self.project_id)
+ self.post(url, headers=headers, expected_status=401)
+
+ def test_bad_authorizing_roles(self):
+ consumer = self._create_single_consumer()
+ consumer_id = consumer.get('id')
+ consumer_secret = consumer.get('secret')
+ consumer = oauth1.Consumer(consumer_id, consumer_secret)
+
+ url, headers = self._create_request_token(consumer,
+ self.role_id,
+ self.project_id)
+ content = self.post(url, headers=headers)
+ credentials = urlparse.parse_qs(content.result)
+ request_key = credentials.get('oauth_token')[0]
+
+ self.identity_api.remove_role_from_user_and_project(self.user_id,
+ self.project_id,
+ self.role_id)
+ url = self._authorize_request_token(request_key)
+ self.admin_request(path=url, method='PUT', expected_status=404)
+
+ def test_expired_authorizing_request_token(self):
+ CONF.oauth1.request_token_duration = -1
+
+ consumer = self._create_single_consumer()
+ consumer_id = consumer.get('id')
+ consumer_secret = consumer.get('secret')
+ self.consumer = oauth1.Consumer(consumer_id, consumer_secret)
+ self.assertIsNotNone(self.consumer.key)
+
+ url, headers = self._create_request_token(self.consumer,
+ self.role_id,
+ self.project_id)
+ content = self.post(url, headers=headers)
+ credentials = urlparse.parse_qs(content.result)
+ request_key = credentials.get('oauth_token')[0]
+ request_secret = credentials.get('oauth_token_secret')[0]
+ self.request_token = oauth1.Token(request_key, request_secret)
+ self.assertIsNotNone(self.request_token.key)
+
+ url = self._authorize_request_token(request_key)
+ self.put(url, expected_status=401)
+
+ def test_expired_creating_keystone_token(self):
+ CONF.oauth1.access_token_duration = -1
+ consumer = self._create_single_consumer()
+ consumer_id = consumer.get('id')
+ consumer_secret = consumer.get('secret')
+ self.consumer = oauth1.Consumer(consumer_id, consumer_secret)
+ self.assertIsNotNone(self.consumer.key)
+
+ url, headers = self._create_request_token(self.consumer,
+ self.role_id,
+ self.project_id)
+ content = self.post(url, headers=headers)
+ credentials = urlparse.parse_qs(content.result)
+ request_key = credentials.get('oauth_token')[0]
+ request_secret = credentials.get('oauth_token_secret')[0]
+ self.request_token = oauth1.Token(request_key, request_secret)
+ self.assertIsNotNone(self.request_token.key)
+
+ url = self._authorize_request_token(request_key)
+ resp = self.put(url, expected_status=200)
+ self.verifier = resp.result['token']['oauth_verifier']
+
+ self.request_token.set_verifier(self.verifier)
+ url, headers = self._create_access_token(self.consumer,
+ self.request_token)
+ content = self.post(url, headers=headers)
+ credentials = urlparse.parse_qs(content.result)
+ access_key = credentials.get('oauth_token')[0]
+ access_secret = credentials.get('oauth_token_secret')[0]
+ self.access_token = oauth1.Token(access_key, access_secret)
+ self.assertIsNotNone(self.access_token.key)
+
+ url, headers, body = self._get_oauth_token(self.consumer,
+ self.access_token)
+ self.post(url, headers=headers, body=body, expected_status=401)