diff options
Diffstat (limited to 'keystone/tests')
-rw-r--r-- | keystone/tests/_ldap_livetest.py | 11 | ||||
-rw-r--r-- | keystone/tests/backend_multi_ldap_sql.conf | 35 | ||||
-rw-r--r-- | keystone/tests/core.py | 16 | ||||
-rw-r--r-- | keystone/tests/keystone.Default.conf | 14 | ||||
-rw-r--r-- | keystone/tests/keystone.domain1.conf | 11 | ||||
-rw-r--r-- | keystone/tests/keystone.domain2.conf | 13 | ||||
-rw-r--r-- | keystone/tests/test_backend.py | 11 | ||||
-rw-r--r-- | keystone/tests/test_backend_ldap.py | 330 | ||||
-rw-r--r-- | keystone/tests/test_backend_sql.py | 2 | ||||
-rw-r--r-- | keystone/tests/test_drivers.py | 5 | ||||
-rw-r--r-- | keystone/tests/test_keystoneclient.py | 55 | ||||
-rw-r--r-- | keystone/tests/test_overrides.conf | 3 | ||||
-rw-r--r-- | keystone/tests/test_s3_token_middleware.py | 4 | ||||
-rw-r--r-- | keystone/tests/test_sql_migrate_extensions.py | 63 | ||||
-rw-r--r-- | keystone/tests/test_sql_upgrade.py | 40 | ||||
-rw-r--r-- | keystone/tests/test_v3_auth.py | 61 | ||||
-rw-r--r-- | keystone/tests/test_v3_oauth1.py | 574 |
17 files changed, 1204 insertions, 44 deletions
diff --git a/keystone/tests/_ldap_livetest.py b/keystone/tests/_ldap_livetest.py index 59da4e66..4562ccb6 100644 --- a/keystone/tests/_ldap_livetest.py +++ b/keystone/tests/_ldap_livetest.py @@ -87,9 +87,6 @@ class LiveLDAPIdentity(test_backend_ldap.LDAPIdentity): def tearDown(self): test.TestCase.tearDown(self) - def test_user_enable_attribute_mask(self): - self.skipTest('Test is for Active Directory Only') - def test_ldap_dereferencing(self): alt_users_ldif = {'objectclass': ['top', 'organizationalUnit'], 'ou': 'alt_users'} @@ -158,3 +155,11 @@ class LiveLDAPIdentity(test_backend_ldap.LDAPIdentity): alias_dereferencing=deref) self.assertEqual(ldap.DEREF_SEARCHING, ldap_wrapper.conn.get_option(ldap.OPT_DEREF)) + + def test_user_enable_attribute_mask(self): + CONF.ldap.user_enabled_emulation = False + CONF.ldap.user_enabled_attribute = 'employeeType' + super(LiveLDAPIdentity, self).test_user_enable_attribute_mask() + + def test_create_unicode_user_name(self): + self.skipTest('Addressed by bug #1172106') diff --git a/keystone/tests/backend_multi_ldap_sql.conf b/keystone/tests/backend_multi_ldap_sql.conf new file mode 100644 index 00000000..59cff761 --- /dev/null +++ b/keystone/tests/backend_multi_ldap_sql.conf @@ -0,0 +1,35 @@ +[sql] +connection = sqlite:// +#For a file based sqlite use +#connection = sqlite:////tmp/keystone.db +#To Test MySQL: +#connection = mysql://keystone:keystone@localhost/keystone?charset=utf8 +#To Test PostgreSQL: +#connection = postgresql://keystone:keystone@localhost/keystone?client_encoding=utf8 +idle_timeout = 200 + +[identity] +# common identity backend is SQL, domain specific configs will +# set their backends to ldap +driver = keystone.identity.backends.sql.Identity +# The test setup will set this to True, to allow easier creation +# of initial domain data +# domain_specific_drivers_enabled = True + +[assignment] +driver = keystone.assignment.backends.sql.Assignment + +[token] +driver = keystone.token.backends.sql.Token + +[ec2] +driver = keystone.contrib.ec2.backends.sql.Ec2 + +[catalog] +driver = keystone.catalog.backends.sql.Catalog + +[policy] +driver = keystone.policy.backends.sql.Policy + +[trust] +driver = keystone.trust.backends.sql.Trust diff --git a/keystone/tests/core.py b/keystone/tests/core.py index 21dc61dc..cba6cbf8 100644 --- a/keystone/tests/core.py +++ b/keystone/tests/core.py @@ -40,15 +40,16 @@ from keystone import assignment from keystone import catalog from keystone.common import dependency from keystone.common import kvs -from keystone.common import logging from keystone.common import sql from keystone.common import utils from keystone.common import wsgi from keystone import config from keystone.contrib import ec2 +from keystone.contrib import oauth1 from keystone import credential from keystone import exception from keystone import identity +from keystone.openstack.common import log as logging from keystone.openstack.common import timeutils from keystone import policy from keystone import token @@ -68,9 +69,6 @@ CONF = config.CONF cd = os.chdir -logging.getLogger('routes.middleware').level = logging.WARN - - def rootdir(*p): return os.path.join(ROOTDIR, *p) @@ -271,7 +269,7 @@ class TestCase(NoModule, unittest.TestCase): # assignment manager gets the default assignment driver from the # identity driver. for manager in [identity, assignment, catalog, credential, ec2, policy, - token, token_provider, trust]: + token, token_provider, trust, oauth1]: # manager.__name__ is like keystone.xxx[.yyy], # converted to xxx[_yyy] manager_name = ('%s_api' % @@ -295,9 +293,11 @@ class TestCase(NoModule, unittest.TestCase): for domain in fixtures.DOMAINS: try: rv = self.identity_api.create_domain(domain['id'], domain) - except (exception.Conflict, exception.NotImplemented): - pass - setattr(self, 'domain_%s' % domain['id'], domain) + except exception.Conflict: + rv = self.identity_api.get_domain(domain['id']) + except exception.NotImplemented: + rv = domain + setattr(self, 'domain_%s' % domain['id'], rv) for tenant in fixtures.TENANTS: try: diff --git a/keystone/tests/keystone.Default.conf b/keystone/tests/keystone.Default.conf new file mode 100644 index 00000000..7049afed --- /dev/null +++ b/keystone/tests/keystone.Default.conf @@ -0,0 +1,14 @@ +# The domain-specific configuration file for the default domain for +# use with unit tests. +# +# The domain_name of the default domain is 'Default', hence the +# strange mix of upper/lower case in the file name. + +[ldap] +url = fake://memory +user = cn=Admin +password = password +suffix = cn=example,cn=com + +[identity] +driver = keystone.identity.backends.ldap.Identity
\ No newline at end of file diff --git a/keystone/tests/keystone.domain1.conf b/keystone/tests/keystone.domain1.conf new file mode 100644 index 00000000..6b7e2488 --- /dev/null +++ b/keystone/tests/keystone.domain1.conf @@ -0,0 +1,11 @@ +# The domain-specific configuration file for the test domain +# 'domain1' for use with unit tests. + +[ldap] +url = fake://memory1 +user = cn=Admin +password = password +suffix = cn=example,cn=com + +[identity] +driver = keystone.identity.backends.ldap.Identity
\ No newline at end of file diff --git a/keystone/tests/keystone.domain2.conf b/keystone/tests/keystone.domain2.conf new file mode 100644 index 00000000..0ed68eb9 --- /dev/null +++ b/keystone/tests/keystone.domain2.conf @@ -0,0 +1,13 @@ +# The domain-specific configuration file for the test domain +# 'domain2' for use with unit tests. + +[ldap] +url = fake://memory +user = cn=Admin +password = password +suffix = cn=myroot,cn=com +group_tree_dn = ou=UserGroups,dc=myroot,dc=org +user_tree_dn = ou=Users,dc=myroot,dc=org + +[identity] +driver = keystone.identity.backends.ldap.Identity
\ No newline at end of file diff --git a/keystone/tests/test_backend.py b/keystone/tests/test_backend.py index 8622b10d..8013deec 100644 --- a/keystone/tests/test_backend.py +++ b/keystone/tests/test_backend.py @@ -105,7 +105,9 @@ class IdentityTests(object): self.assertIn(CONF.member_role_id, role_list) def test_password_hashed(self): - user_ref = self.identity_api._get_user(self.user_foo['id']) + driver = self.identity_api._select_identity_driver( + self.user_foo['domain_id']) + user_ref = driver._get_user(self.user_foo['id']) self.assertNotEqual(user_ref['password'], self.user_foo['password']) def test_create_unicode_user_name(self): @@ -1521,7 +1523,8 @@ class IdentityTests(object): self.assertRaises(exception.UserNotFound, self.identity_api.update_user, user_id, - {'id': user_id}) + {'id': user_id, + 'domain_id': DEFAULT_DOMAIN_ID}) def test_delete_user_with_project_association(self): user = {'id': uuid.uuid4().hex, @@ -1628,7 +1631,7 @@ class IdentityTests(object): tenant) def test_create_user_long_name_fails(self): - user = {'id': 'fake1', 'name': 'a' * 65, + user = {'id': 'fake1', 'name': 'a' * 256, 'domain_id': DEFAULT_DOMAIN_ID} self.assertRaises(exception.ValidationError, self.identity_api.create_user, @@ -1701,7 +1704,7 @@ class IdentityTests(object): user = {'id': 'fake1', 'name': 'fake1', 'domain_id': DEFAULT_DOMAIN_ID} self.identity_api.create_user('fake1', user) - user['name'] = 'a' * 65 + user['name'] = 'a' * 256 self.assertRaises(exception.ValidationError, self.identity_api.update_user, 'fake1', diff --git a/keystone/tests/test_backend_ldap.py b/keystone/tests/test_backend_ldap.py index 9c1c98d5..23379712 100644 --- a/keystone/tests/test_backend_ldap.py +++ b/keystone/tests/test_backend_ldap.py @@ -17,6 +17,8 @@ import uuid +import ldap + from keystone import assignment from keystone.common.ldap import fakeldap from keystone.common import sql @@ -38,8 +40,16 @@ class BaseLDAPIdentity(test_backend.IdentityTests): return self.identity_api.get_domain(CONF.identity.default_domain_id) def clear_database(self): - db = fakeldap.FakeShelve().get_instance() - db.clear() + for shelf in fakeldap.FakeShelves: + fakeldap.FakeShelves[shelf].clear() + + def reload_backends(self, domain_id): + # Only one backend unless we are using separate domain backends + self.load_backends() + + def get_config(self, domain_id): + # Only one conf structure unless we are using separate domain backends + return CONF def _set_config(self): self.config([test.etcdir('keystone.conf.sample'), @@ -57,6 +67,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests): user = {'id': 'fake1', 'name': 'fake1', 'password': 'fakepass1', + 'domain_id': CONF.identity.default_domain_id, 'tenants': ['bar']} self.identity_api.create_user('fake1', user) user_ref = self.identity_api.get_user('fake1') @@ -71,14 +82,16 @@ class BaseLDAPIdentity(test_backend.IdentityTests): 'fake1') def test_configurable_forbidden_user_actions(self): - CONF.ldap.user_allow_create = False - CONF.ldap.user_allow_update = False - CONF.ldap.user_allow_delete = False - self.load_backends() + conf = self.get_config(CONF.identity.default_domain_id) + conf.ldap.user_allow_create = False + conf.ldap.user_allow_update = False + conf.ldap.user_allow_delete = False + self.reload_backends(CONF.identity.default_domain_id) user = {'id': 'fake1', 'name': 'fake1', 'password': 'fakepass1', + 'domain_id': CONF.identity.default_domain_id, 'tenants': ['bar']} self.assertRaises(exception.ForbiddenAction, self.identity_api.create_user, @@ -100,8 +113,9 @@ class BaseLDAPIdentity(test_backend.IdentityTests): self.user_foo.pop('password') self.assertDictEqual(user_ref, self.user_foo) - CONF.ldap.user_filter = '(CN=DOES_NOT_MATCH)' - self.load_backends() + conf = self.get_config(user_ref['domain_id']) + conf.ldap.user_filter = '(CN=DOES_NOT_MATCH)' + self.reload_backends(user_ref['domain_id']) self.assertRaises(exception.UserNotFound, self.identity_api.get_user, self.user_foo['id']) @@ -205,18 +219,21 @@ class BaseLDAPIdentity(test_backend.IdentityTests): # Create a group group_id = None - group = dict(name=uuid.uuid4().hex) + group = dict(name=uuid.uuid4().hex, + domain_id=CONF.identity.default_domain_id) group_id = self.identity_api.create_group(group_id, group)['id'] # Create a couple of users and add them to the group. user_id = None - user = dict(name=uuid.uuid4().hex, id=uuid.uuid4().hex) + user = dict(name=uuid.uuid4().hex, id=uuid.uuid4().hex, + domain_id=CONF.identity.default_domain_id) user_1_id = self.identity_api.create_user(user_id, user)['id'] self.identity_api.add_user_to_group(user_1_id, group_id) user_id = None - user = dict(name=uuid.uuid4().hex, id=uuid.uuid4().hex) + user = dict(name=uuid.uuid4().hex, id=uuid.uuid4().hex, + domain_id=CONF.identity.default_domain_id) user_2_id = self.identity_api.create_user(user_id, user)['id'] self.identity_api.add_user_to_group(user_2_id, group_id) @@ -224,7 +241,9 @@ class BaseLDAPIdentity(test_backend.IdentityTests): # Delete user 2 # NOTE(blk-u): need to go directly to user interface to keep from # updating the group. - self.identity_api.driver.user.delete(user_2_id) + driver = self.identity_api._select_identity_driver( + user['domain_id']) + driver.user.delete(user_2_id) # List group users and verify only user 1. res = self.identity_api.list_users_in_group(group_id) @@ -249,13 +268,16 @@ class BaseLDAPIdentity(test_backend.IdentityTests): self.identity_api.create_user(user['id'], user) self.identity_api.add_user_to_project(self.tenant_baz['id'], user['id']) - self.identity_api.driver.user.LDAP_USER = None - self.identity_api.driver.user.LDAP_PASSWORD = None + driver = self.identity_api._select_identity_driver( + user['domain_id']) + driver.user.LDAP_USER = None + driver.user.LDAP_PASSWORD = None self.assertRaises(AssertionError, self.identity_api.authenticate, user_id=user['id'], - password=None) + password=None, + domain_scope=user['domain_id']) # (spzala)The group and domain crud tests below override the standard ones # in test_backend.py so that we can exclude the update name test, since we @@ -454,24 +476,56 @@ class LDAPIdentity(test.TestCase, BaseLDAPIdentity): self.assertNotIn('name', role_ref) def test_user_enable_attribute_mask(self): - CONF.ldap.user_enabled_attribute = 'enabled' CONF.ldap.user_enabled_mask = 2 - CONF.ldap.user_enabled_default = 512 + CONF.ldap.user_enabled_default = '512' self.clear_database() - user = {'id': 'fake1', 'name': 'fake1', 'enabled': True} - self.identity_api.create_user('fake1', user) + self.load_backends() + self.load_fixtures(default_fixtures) + + ldap_ = self.identity_api.driver.user.get_connection() + + def get_enabled_vals(): + user_dn = self.identity_api.driver.user._id_to_dn_string('fake1') + enabled_attr_name = CONF.ldap.user_enabled_attribute + + res = ldap_.search_s(user_dn, + ldap.SCOPE_BASE, + query='(sn=fake1)') + return res[0][1][enabled_attr_name] + + user = {'id': 'fake1', 'name': 'fake1', 'enabled': True, + 'domain_id': CONF.identity.default_domain_id} + + user_ref = self.identity_api.create_user('fake1', user) + + self.assertEqual(user_ref['enabled'], 512) + # TODO(blk-u): 512 seems wrong, should it be True? + + enabled_vals = get_enabled_vals() + self.assertEqual(enabled_vals, [512]) + user_ref = self.identity_api.get_user('fake1') - self.assertEqual(user_ref['enabled'], True) + self.assertIs(user_ref['enabled'], True) user['enabled'] = False - self.identity_api.update_user('fake1', user) + user_ref = self.identity_api.update_user('fake1', user) + self.assertIs(user_ref['enabled'], False) + + enabled_vals = get_enabled_vals() + self.assertEqual(enabled_vals, [514]) + user_ref = self.identity_api.get_user('fake1') - self.assertEqual(user_ref['enabled'], False) + self.assertIs(user_ref['enabled'], False) user['enabled'] = True - self.identity_api.update_user('fake1', user) + user_ref = self.identity_api.update_user('fake1', user) + self.assertIs(user_ref['enabled'], True) + + enabled_vals = get_enabled_vals() + self.assertEqual(enabled_vals, [512]) + user_ref = self.identity_api.get_user('fake1') - self.assertEqual(user_ref['enabled'], True) + self.assertIs(user_ref['enabled'], True) def test_user_api_get_connection_no_user_password(self): """Don't bind in case the user and password are blank.""" @@ -510,6 +564,7 @@ class LDAPIdentity(test.TestCase, BaseLDAPIdentity): 'id': 'extra_attributes', 'name': 'EXTRA_ATTRIBUTES', 'password': 'extra', + 'domain_id': CONF.identity.default_domain_id } self.identity_api.create_user(user['id'], user) dn, attrs = self.identity_api.driver.user._ldap_get(user['id']) @@ -743,3 +798,230 @@ class LdapIdentitySqlAssignment(sql.Base, test.TestCase, BaseLDAPIdentity): def test_role_filter(self): self.skipTest( 'N/A: Not part of SQL backend') + + +class MultiLDAPandSQLIdentity(sql.Base, test.TestCase, BaseLDAPIdentity): + """Class to test common SQL plus individual LDAP backends. + + We define a set of domains and domain-specific backends: + + - A separate LDAP backend for the default domain + - A separate LDAP backend for domain1 + - domain2 shares the same LDAP as domain1, but uses a different + tree attach point + - An SQL backend for all other domains (which will include domain3 + and domain4) + + Normally one would expect that the default domain would be handled as + part of the "other domains" - however the above provides better + test coverage since most of the existing backend tests use the default + domain. + + """ + def setUp(self): + super(MultiLDAPandSQLIdentity, self).setUp() + + self._set_config() + self.load_backends() + self.engine = self.get_engine() + sql.ModelBase.metadata.create_all(bind=self.engine) + self._setup_domain_test_data() + + # All initial domain data setup complete, time to switch on support + # for separate backends per domain. + + self.orig_config_domains_enabled = ( + config.CONF.identity.domain_specific_drivers_enabled) + self.opt_in_group('identity', domain_specific_drivers_enabled=True) + self.orig_config_dir = ( + config.CONF.identity.domain_config_dir) + self.opt_in_group('identity', domain_config_dir=test.TESTSDIR) + self._set_domain_configs() + self.clear_database() + self.load_fixtures(default_fixtures) + + def tearDown(self): + super(MultiLDAPandSQLIdentity, self).tearDown() + self.opt_in_group( + 'identity', + domain_config_dir=self.orig_config_dir) + self.opt_in_group( + 'identity', + domain_specific_drivers_enabled=self.orig_config_domains_enabled) + sql.ModelBase.metadata.drop_all(bind=self.engine) + self.engine.dispose() + sql.set_global_engine(None) + + def _set_config(self): + self.config([test.etcdir('keystone.conf.sample'), + test.testsdir('test_overrides.conf'), + test.testsdir('backend_multi_ldap_sql.conf')]) + + def _setup_domain_test_data(self): + + def create_domain(domain): + try: + ref = self.assignment_api.create_domain( + domain['id'], domain) + except exception.Conflict: + ref = ( + self.assignment_api.get_domain_by_name(domain['name'])) + return ref + + self.domain_default = create_domain(assignment.DEFAULT_DOMAIN) + self.domain1 = create_domain( + {'id': uuid.uuid4().hex, 'name': 'domain1'}) + self.domain2 = create_domain( + {'id': uuid.uuid4().hex, 'name': 'domain2'}) + self.domain3 = create_domain( + {'id': uuid.uuid4().hex, 'name': 'domain3'}) + self.domain4 = create_domain( + {'id': uuid.uuid4().hex, 'name': 'domain4'}) + + def _set_domain_configs(self): + # We need to load the domain configs explicitly to ensure the + # test overrides are included. + self.identity_api.domain_configs._load_config( + self.identity_api.assignment_api, + [test.etcdir('keystone.conf.sample'), + test.testsdir('test_overrides.conf'), + test.testsdir('backend_multi_ldap_sql.conf'), + test.testsdir('keystone.Default.conf')], + 'Default') + self.identity_api.domain_configs._load_config( + self.identity_api.assignment_api, + [test.etcdir('keystone.conf.sample'), + test.testsdir('test_overrides.conf'), + test.testsdir('backend_multi_ldap_sql.conf'), + test.testsdir('keystone.domain1.conf')], + 'domain1') + self.identity_api.domain_configs._load_config( + self.identity_api.assignment_api, + [test.etcdir('keystone.conf.sample'), + test.testsdir('test_overrides.conf'), + test.testsdir('backend_multi_ldap_sql.conf'), + test.testsdir('keystone.domain2.conf')], + 'domain2') + + def reload_backends(self, domain_id): + # Just reload the driver for this domain - which will pickup + # any updated cfg + self.identity_api.domain_configs.reload_domain_driver( + self.identity_api.assignment_api, domain_id) + + def get_config(self, domain_id): + # Get the config for this domain, will return CONF + # if no specific config defined for this domain + return self.identity_api.domain_configs.get_domain_conf(domain_id) + + def test_list_domains(self): + self.skipTest( + 'N/A: Not relevant for multi ldap testing') + + def test_domain_segregation(self): + """Test that separate configs have segregated the domain. + + Test Plan: + - Create a user in each of the domains + - Make sure that you can only find a given user in its + relevant domain + - Make sure that for a backend that supports multiple domains + you can get the users via any of the domain scopes + + """ + def create_user(domain_id): + user = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, + 'domain_id': domain_id, + 'password': uuid.uuid4().hex, + 'enabled': True} + self.identity_api.create_user(user['id'], user) + return user + + userd = create_user(CONF.identity.default_domain_id) + user1 = create_user(self.domain1['id']) + user2 = create_user(self.domain2['id']) + user3 = create_user(self.domain3['id']) + user4 = create_user(self.domain4['id']) + + # Now check that I can read user1 with the appropriate domain + # scope, but won't find it if the wrong scope is used + + ref = self.identity_api.get_user( + userd['id'], domain_scope=CONF.identity.default_domain_id) + del userd['password'] + self.assertDictEqual(ref, userd) + self.assertRaises(exception.UserNotFound, + self.identity_api.get_user, + userd['id'], + domain_scope=self.domain1['id']) + self.assertRaises(exception.UserNotFound, + self.identity_api.get_user, + userd['id'], + domain_scope=self.domain2['id']) + self.assertRaises(exception.UserNotFound, + self.identity_api.get_user, + userd['id'], + domain_scope=self.domain3['id']) + self.assertRaises(exception.UserNotFound, + self.identity_api.get_user, + userd['id'], + domain_scope=self.domain4['id']) + + ref = self.identity_api.get_user( + user1['id'], domain_scope=self.domain1['id']) + del user1['password'] + self.assertDictEqual(ref, user1) + ref = self.identity_api.get_user( + user2['id'], domain_scope=self.domain2['id']) + del user2['password'] + self.assertDictEqual(ref, user2) + + # Domains 3 and 4 share the same backend, so you should be + # able to see user3 and 4 from either + + ref = self.identity_api.get_user( + user3['id'], domain_scope=self.domain3['id']) + del user3['password'] + self.assertDictEqual(ref, user3) + ref = self.identity_api.get_user( + user4['id'], domain_scope=self.domain4['id']) + del user4['password'] + self.assertDictEqual(ref, user4) + ref = self.identity_api.get_user( + user3['id'], domain_scope=self.domain4['id']) + self.assertDictEqual(ref, user3) + ref = self.identity_api.get_user( + user4['id'], domain_scope=self.domain3['id']) + self.assertDictEqual(ref, user4) + + def test_scanning_of_config_dir(self): + """Test the Manager class scans the config directory. + + The setup for the main tests above load the domain configs directly + so that the test overrides can be included. This test just makes sure + that the standard config directory scanning does pick up the relevant + domain config files. + + """ + # Confirm that config has drivers_enabled as True, which we will + # check has been set to False later in this test + self.assertTrue(config.CONF.identity.domain_specific_drivers_enabled) + self.load_backends() + # Execute any command to trigger the lazy loading of domain configs + self.identity_api.list_users(domain_scope=self.domain1['id']) + # ...and now check the domain configs have been set up + self.assertIn('default', self.identity_api.domain_configs) + self.assertIn(self.domain1['id'], self.identity_api.domain_configs) + self.assertIn(self.domain2['id'], self.identity_api.domain_configs) + self.assertNotIn(self.domain3['id'], self.identity_api.domain_configs) + self.assertNotIn(self.domain4['id'], self.identity_api.domain_configs) + + # Finally check that a domain specific config contains items from both + # the primary config and the domain specific config + conf = self.identity_api.domain_configs.get_domain_conf( + self.domain1['id']) + # This should now be false, as is the default, since this is not + # set in the standard primary config file + self.assertFalse(conf.identity.domain_specific_drivers_enabled) + # ..and make sure a domain-specifc options is also set + self.assertEqual(conf.ldap.url, 'fake://memory1') diff --git a/keystone/tests/test_backend_sql.py b/keystone/tests/test_backend_sql.py index 773ae862..24159eb6 100644 --- a/keystone/tests/test_backend_sql.py +++ b/keystone/tests/test_backend_sql.py @@ -81,7 +81,7 @@ class SqlModels(SqlTests): def test_user_model(self): cols = (('id', sql.String, 64), - ('name', sql.String, 64), + ('name', sql.String, 255), ('password', sql.String, 128), ('domain_id', sql.String, 64), ('enabled', sql.Boolean, None), diff --git a/keystone/tests/test_drivers.py b/keystone/tests/test_drivers.py index c83c1a89..888b365c 100644 --- a/keystone/tests/test_drivers.py +++ b/keystone/tests/test_drivers.py @@ -3,6 +3,7 @@ import unittest2 as unittest from keystone import assignment from keystone import catalog +from keystone.contrib import oauth1 from keystone import exception from keystone import identity from keystone import policy @@ -55,3 +56,7 @@ class TestDrivers(unittest.TestCase): def test_token_driver_unimplemented(self): interface = token.Driver() self.assertInterfaceNotImplemented(interface) + + def test_oauth1_driver_unimplemented(self): + interface = oauth1.Driver() + self.assertInterfaceNotImplemented(interface) diff --git a/keystone/tests/test_keystoneclient.py b/keystone/tests/test_keystoneclient.py index 7e59885d..ff2462f4 100644 --- a/keystone/tests/test_keystoneclient.py +++ b/keystone/tests/test_keystoneclient.py @@ -378,6 +378,46 @@ class KeystoneClientTests(object): client.tokens.authenticate, token=token_id) + def test_disable_tenant_invalidates_token(self): + from keystoneclient import exceptions as client_exceptions + + admin_client = self.get_client(admin=True) + foo_client = self.get_client(self.user_foo) + tenant_bar = admin_client.tenants.get(self.tenant_bar['id']) + + # Disable the tenant. + tenant_bar.update(enabled=False) + + # Test that the token has been removed. + self.assertRaises(client_exceptions.Unauthorized, + foo_client.tokens.authenticate, + token=foo_client.auth_token) + + # Test that the user access has been disabled. + self.assertRaises(client_exceptions.Unauthorized, + self.get_client, + self.user_foo) + + def test_delete_tenant_invalidates_token(self): + from keystoneclient import exceptions as client_exceptions + + admin_client = self.get_client(admin=True) + foo_client = self.get_client(self.user_foo) + tenant_bar = admin_client.tenants.get(self.tenant_bar['id']) + + # Delete the tenant. + tenant_bar.delete() + + # Test that the token has been removed. + self.assertRaises(client_exceptions.Unauthorized, + foo_client.tokens.authenticate, + token=foo_client.auth_token) + + # Test that the user access has been disabled. + self.assertRaises(client_exceptions.Unauthorized, + self.get_client, + self.user_foo) + def test_disable_user_invalidates_token(self): from keystoneclient import exceptions as client_exceptions @@ -495,6 +535,15 @@ class KeystoneClientTests(object): user = client.users.update_tenant( user=user, tenant=self.tenant_bar['id']) + def test_user_create_no_string_password(self): + from keystoneclient import exceptions as client_exceptions + client = self.get_client(admin=True) + self.assertRaises(client_exceptions.BadRequest, + client.users.create, + name='test_user', + password=12345, + email=uuid.uuid4().hex) + def test_user_create_no_name(self): from keystoneclient import exceptions as client_exceptions client = self.get_client(admin=True) @@ -1165,6 +1214,12 @@ class KcEssex3TestCase(CompatTestCase, KeystoneClientTests): def test_policy_crud(self): self.skipTest('N/A due to lack of endpoint CRUD') + def test_disable_tenant_invalidates_token(self): + self.skipTest('N/A') + + def test_delete_tenant_invalidates_token(self): + self.skipTest('N/A') + class Kc11TestCase(CompatTestCase, KeystoneClientTests): def get_checkout(self): diff --git a/keystone/tests/test_overrides.conf b/keystone/tests/test_overrides.conf index aac29f26..5cd522b2 100644 --- a/keystone/tests/test_overrides.conf +++ b/keystone/tests/test_overrides.conf @@ -14,6 +14,9 @@ driver = keystone.trust.backends.kvs.Trust [token] driver = keystone.token.backends.kvs.Token +[oauth1] +driver = keystone.contrib.oauth1.backends.kvs.OAuth1 + [signing] certfile = ../../examples/pki/certs/signing_cert.pem keyfile = ../../examples/pki/private/signing_key.pem diff --git a/keystone/tests/test_s3_token_middleware.py b/keystone/tests/test_s3_token_middleware.py index ec31f2ac..2d561c10 100644 --- a/keystone/tests/test_s3_token_middleware.py +++ b/keystone/tests/test_s3_token_middleware.py @@ -225,9 +225,9 @@ class S3TokenMiddlewareTestUtil(unittest.TestCase): def test_split_path_invalid_path(self): try: s3_token.split_path('o\nn e', 2) - except ValueError, err: + except ValueError as err: self.assertEquals(str(err), 'Invalid path: o%0An%20e') try: s3_token.split_path('o\nn e', 2, 3, True) - except ValueError, err: + except ValueError as err: self.assertEquals(str(err), 'Invalid path: o%0An%20e') diff --git a/keystone/tests/test_sql_migrate_extensions.py b/keystone/tests/test_sql_migrate_extensions.py index 4a529559..f9393cbe 100644 --- a/keystone/tests/test_sql_migrate_extensions.py +++ b/keystone/tests/test_sql_migrate_extensions.py @@ -27,6 +27,7 @@ To run these tests against a live database: """ from keystone.contrib import example +from keystone.contrib import oauth1 import test_sql_upgrade @@ -45,3 +46,65 @@ class SqlUpgradeExampleExtension(test_sql_upgrade.SqlMigrateBase): self.assertTableColumns('example', ['id', 'type', 'extra']) self.downgrade(0, repository=self.repo_path) self.assertTableDoesNotExist('example') + + +class SqlUpgradeOAuth1Extension(test_sql_upgrade.SqlMigrateBase): + def repo_package(self): + return oauth1 + + def test_upgrade(self): + self.assertTableDoesNotExist('consumer') + self.assertTableDoesNotExist('request_token') + self.assertTableDoesNotExist('access_token') + self.upgrade(1, repository=self.repo_path) + self.assertTableColumns('consumer', + ['id', + 'description', + 'secret', + 'extra']) + self.assertTableColumns('request_token', + ['id', + 'request_secret', + 'verifier', + 'authorizing_user_id', + 'requested_project_id', + 'requested_roles', + 'consumer_id', + 'expires_at']) + self.assertTableColumns('access_token', + ['id', + 'access_secret', + 'authorizing_user_id', + 'project_id', + 'requested_roles', + 'consumer_id', + 'expires_at']) + + def test_downgrade(self): + self.upgrade(1, repository=self.repo_path) + self.assertTableColumns('consumer', + ['id', + 'description', + 'secret', + 'extra']) + self.assertTableColumns('request_token', + ['id', + 'request_secret', + 'verifier', + 'authorizing_user_id', + 'requested_project_id', + 'requested_roles', + 'consumer_id', + 'expires_at']) + self.assertTableColumns('access_token', + ['id', + 'access_secret', + 'authorizing_user_id', + 'project_id', + 'requested_roles', + 'consumer_id', + 'expires_at']) + self.downgrade(0, repository=self.repo_path) + self.assertTableDoesNotExist('consumer') + self.assertTableDoesNotExist('request_token') + self.assertTableDoesNotExist('access_token') diff --git a/keystone/tests/test_sql_upgrade.py b/keystone/tests/test_sql_upgrade.py index e904d6a7..0ee63433 100644 --- a/keystone/tests/test_sql_upgrade.py +++ b/keystone/tests/test_sql_upgrade.py @@ -556,6 +556,42 @@ class SqlUpgradeTests(SqlMigrateBase): insert.execute(d) session.commit() + def test_upgrade_31_to_32(self): + self.upgrade(32) + + user_table = self.select_table("user") + self.assertEquals(user_table.c.name.type.length, 255) + + def test_downgrade_32_to_31(self): + self.upgrade(32) + session = self.Session() + # NOTE(aloga): we need a different metadata object + user_table = sqlalchemy.Table('user', + sqlalchemy.MetaData(), + autoload=True, + autoload_with=self.engine) + user_id = uuid.uuid4().hex + ins = user_table.insert().values( + {'id': user_id, + 'name': 'a' * 255, + 'password': uuid.uuid4().hex, + 'enabled': True, + 'domain_id': DEFAULT_DOMAIN_ID, + 'extra': '{}'}) + session.execute(ins) + session.commit() + + self.downgrade(31) + # Check that username has been truncated + q = session.query(user_table.c.name) + q = q.filter(user_table.c.id == user_id) + r = q.one() + user_name = r[0] + self.assertEquals(len(user_name), 64) + + user_table = self.select_table("user") + self.assertEquals(user_table.c.name.type.length, 64) + def test_downgrade_to_0(self): self.upgrade(self.max_version) @@ -1362,7 +1398,7 @@ class SqlUpgradeTests(SqlMigrateBase): total = connection.execute("SELECT count(*) " "from information_schema.TABLES " "where TABLE_SCHEMA='%(database)s'" % - locals()) + dict(database=database)) self.assertTrue(total.scalar() > 0, "No tables found. Wrong schema?") noninnodb = connection.execute("SELECT table_name " @@ -1370,7 +1406,7 @@ class SqlUpgradeTests(SqlMigrateBase): "where TABLE_SCHEMA='%(database)s' " "and ENGINE!='InnoDB' " "and TABLE_NAME!='migrate_version'" % - locals()) + dict(database=database)) names = [x[0] for x in noninnodb] self.assertEqual(names, [], "Non-InnoDB tables exist") diff --git a/keystone/tests/test_v3_auth.py b/keystone/tests/test_v3_auth.py index 43f87d98..1f4425ce 100644 --- a/keystone/tests/test_v3_auth.py +++ b/keystone/tests/test_v3_auth.py @@ -545,6 +545,67 @@ class TestTokenRevoking(test_v3.RestfulTestCase): headers={'X-Subject-Token': token}, expected_status=204) + def test_disabling_project_revokes_token(self): + resp = self.post( + '/auth/tokens', + body=self.build_authentication_request( + user_id=self.user3['id'], + password=self.user3['password'], + project_id=self.projectA['id'])) + token = resp.headers.get('X-Subject-Token') + + # confirm token is valid + self.head('/auth/tokens', + headers={'X-Subject-Token': token}, + expected_status=204) + + # disable the project, which should invalidate the token + self.patch( + '/projects/%(project_id)s' % {'project_id': self.projectA['id']}, + body={'project': {'enabled': False}}) + + # user should no longer have access to the project + self.head('/auth/tokens', + headers={'X-Subject-Token': token}, + expected_status=401) + resp = self.post( + '/auth/tokens', + body=self.build_authentication_request( + user_id=self.user3['id'], + password=self.user3['password'], + project_id=self.projectA['id']), + expected_status=401) + + def test_deleting_project_revokes_token(self): + resp = self.post( + '/auth/tokens', + body=self.build_authentication_request( + user_id=self.user3['id'], + password=self.user3['password'], + project_id=self.projectA['id'])) + token = resp.headers.get('X-Subject-Token') + + # confirm token is valid + self.head('/auth/tokens', + headers={'X-Subject-Token': token}, + expected_status=204) + + # delete the project, which should invalidate the token + self.delete( + '/projects/%(project_id)s' % {'project_id': self.projectA['id']}) + + # user should no longer have access to the project + self.head('/auth/tokens', + headers={'X-Subject-Token': token}, + expected_status=401) + resp = self.post( + '/auth/tokens', + body=self.build_authentication_request( + user_id=self.user3['id'], + password=self.user3['password'], + project_id=self.projectA['id']), + expected_status=401) + def test_deleting_group_grant_revokes_tokens(self): """Test deleting a group grant revokes tokens. diff --git a/keystone/tests/test_v3_oauth1.py b/keystone/tests/test_v3_oauth1.py new file mode 100644 index 00000000..a0ae5fc6 --- /dev/null +++ b/keystone/tests/test_v3_oauth1.py @@ -0,0 +1,574 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import os +import urlparse +import uuid + +import webtest + +from keystone.common import cms +from keystone import config +from keystone.contrib import oauth1 +from keystone.contrib.oauth1 import controllers +from keystone.tests import core + +import test_v3 + + +OAUTH_PASTE_FILE = 'v3_oauth1-paste.ini' +CONF = config.CONF + + +class OAuth1Tests(test_v3.RestfulTestCase): + def setUp(self): + super(OAuth1Tests, self).setUp() + self.controller = controllers.OAuthControllerV3() + self.base_url = CONF.public_endpoint % CONF + "v3" + self._generate_paste_config() + self.load_backends() + self.admin_app = webtest.TestApp( + self.loadapp('v3_oauth1', name='admin')) + self.public_app = webtest.TestApp( + self.loadapp('v3_oauth1', name='admin')) + + def tearDown(self): + os.remove(OAUTH_PASTE_FILE) + + def _generate_paste_config(self): + # Generate a file, based on keystone-paste.ini, + # that includes oauth_extension in the pipeline + old_pipeline = " ec2_extension " + new_pipeline = " oauth_extension ec2_extension " + + with open(core.etcdir('keystone-paste.ini'), 'r') as f: + contents = f.read() + new_contents = contents.replace(old_pipeline, new_pipeline) + with open(OAUTH_PASTE_FILE, 'w') as f: + f.write(new_contents) + + def _create_single_consumer(self): + ref = {'description': uuid.uuid4().hex} + resp = self.post( + '/OS-OAUTH1/consumers', + body={'consumer': ref}) + return resp.result.get('consumer') + + def _oauth_request(self, consumer, token=None, **kw): + return oauth1.Request.from_consumer_and_token(consumer=consumer, + token=token, + **kw) + + def _create_request_token(self, consumer, role, project_id): + params = {'requested_role_ids': role, + 'requested_project_id': project_id} + headers = {'Content-Type': 'application/json'} + url = '/OS-OAUTH1/request_token' + oreq = self._oauth_request( + consumer=consumer, + http_url=self.base_url + url, + http_method='POST', + parameters=params) + + hmac = oauth1.SignatureMethod_HMAC_SHA1() + oreq.sign_request(hmac, consumer, None) + headers.update(oreq.to_header()) + headers.update(params) + return url, headers + + def _create_access_token(self, consumer, token): + headers = {'Content-Type': 'application/json'} + url = '/OS-OAUTH1/access_token' + oreq = self._oauth_request( + consumer=consumer, token=token, + http_method='POST', + http_url=self.base_url + url) + hmac = oauth1.SignatureMethod_HMAC_SHA1() + oreq.sign_request(hmac, consumer, token) + headers.update(oreq.to_header()) + return url, headers + + def _get_oauth_token(self, consumer, token): + headers = {'Content-Type': 'application/json'} + body = {'auth': {'identity': {'methods': ['oauth1'], 'oauth1': {}}}} + url = '/auth/tokens' + oreq = self._oauth_request( + consumer=consumer, token=token, + http_method='POST', + http_url=self.base_url + url) + hmac = oauth1.SignatureMethod_HMAC_SHA1() + oreq.sign_request(hmac, consumer, token) + headers.update(oreq.to_header()) + return url, headers, body + + def _authorize_request_token(self, request_id): + return '/OS-OAUTH1/authorize/%s' % (request_id) + + +class ConsumerCRUDTests(OAuth1Tests): + + def test_consumer_create(self): + description = uuid.uuid4().hex + ref = {'description': description} + resp = self.post( + '/OS-OAUTH1/consumers', + body={'consumer': ref}) + consumer = resp.result.get('consumer') + consumer_id = consumer.get('id') + self.assertEqual(consumer.get('description'), description) + self.assertIsNotNone(consumer_id) + self.assertIsNotNone(consumer.get('secret')) + + def test_consumer_delete(self): + consumer = self._create_single_consumer() + consumer_id = consumer.get('id') + resp = self.delete('/OS-OAUTH1/consumers/%(consumer_id)s' + % {'consumer_id': consumer_id}) + self.assertResponseStatus(resp, 204) + + def test_consumer_get(self): + consumer = self._create_single_consumer() + consumer_id = consumer.get('id') + resp = self.get('/OS-OAUTH1/consumers/%(consumer_id)s' + % {'consumer_id': consumer_id}) + self.assertTrue(resp.result.get('consumer').get('id'), consumer_id) + + def test_consumer_list(self): + resp = self.get('/OS-OAUTH1/consumers') + entities = resp.result.get('consumers') + self.assertIsNotNone(entities) + self.assertValidListLinks(resp.result.get('links')) + + def test_consumer_update(self): + consumer = self._create_single_consumer() + original_id = consumer.get('id') + original_description = consumer.get('description') + original_secret = consumer.get('secret') + update_description = original_description + "_new" + + update_ref = {'description': update_description} + update_resp = self.patch('/OS-OAUTH1/consumers/%(consumer_id)s' + % {'consumer_id': original_id}, + body={'consumer': update_ref}) + consumer = update_resp.result.get('consumer') + self.assertEqual(consumer.get('description'), update_description) + self.assertEqual(consumer.get('id'), original_id) + self.assertEqual(consumer.get('secret'), original_secret) + + def test_consumer_update_bad_secret(self): + consumer = self._create_single_consumer() + original_id = consumer.get('id') + update_ref = copy.deepcopy(consumer) + update_ref['description'] = uuid.uuid4().hex + update_ref['secret'] = uuid.uuid4().hex + self.patch('/OS-OAUTH1/consumers/%(consumer_id)s' + % {'consumer_id': original_id}, + body={'consumer': update_ref}, + expected_status=400) + + def test_consumer_update_bad_id(self): + consumer = self._create_single_consumer() + original_id = consumer.get('id') + original_description = consumer.get('description') + update_description = original_description + "_new" + + update_ref = copy.deepcopy(consumer) + update_ref['description'] = update_description + update_ref['id'] = update_description + self.patch('/OS-OAUTH1/consumers/%(consumer_id)s' + % {'consumer_id': original_id}, + body={'consumer': update_ref}, + expected_status=400) + + def test_consumer_create_no_description(self): + resp = self.post('/OS-OAUTH1/consumers', body={'consumer': {}}) + consumer = resp.result.get('consumer') + consumer_id = consumer.get('id') + self.assertEqual(consumer.get('description'), None) + self.assertIsNotNone(consumer_id) + self.assertIsNotNone(consumer.get('secret')) + + def test_consumer_get_bad_id(self): + self.get('/OS-OAUTH1/consumers/%(consumer_id)s' + % {'consumer_id': uuid.uuid4().hex}, + expected_status=404) + + +class OAuthFlowTests(OAuth1Tests): + + def test_oauth_flow(self): + consumer = self._create_single_consumer() + consumer_id = consumer.get('id') + consumer_secret = consumer.get('secret') + self.consumer = oauth1.Consumer(consumer_id, consumer_secret) + self.assertIsNotNone(self.consumer.key) + + url, headers = self._create_request_token(self.consumer, + self.role_id, + self.project_id) + content = self.post(url, headers=headers) + credentials = urlparse.parse_qs(content.result) + request_key = credentials.get('oauth_token')[0] + request_secret = credentials.get('oauth_token_secret')[0] + self.request_token = oauth1.Token(request_key, request_secret) + self.assertIsNotNone(self.request_token.key) + + url = self._authorize_request_token(request_key) + resp = self.put(url, expected_status=200) + self.verifier = resp.result['token']['oauth_verifier'] + + self.request_token.set_verifier(self.verifier) + url, headers = self._create_access_token(self.consumer, + self.request_token) + content = self.post(url, headers=headers) + credentials = urlparse.parse_qs(content.result) + access_key = credentials.get('oauth_token')[0] + access_secret = credentials.get('oauth_token_secret')[0] + self.access_token = oauth1.Token(access_key, access_secret) + self.assertIsNotNone(self.access_token.key) + + url, headers, body = self._get_oauth_token(self.consumer, + self.access_token) + content = self.post(url, headers=headers, body=body) + self.keystone_token_id = content.headers.get('X-Subject-Token') + self.keystone_token = content.result.get('token') + self.assertIsNotNone(self.keystone_token_id) + + +class AccessTokenCRUDTests(OAuthFlowTests): + def test_delete_access_token_dne(self): + self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s' + % {'user': self.user_id, + 'auth': uuid.uuid4().hex}, + expected_status=404) + + def test_list_no_access_tokens(self): + resp = self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens' + % {'user_id': self.user_id}) + entities = resp.result.get('access_tokens') + self.assertTrue(len(entities) == 0) + self.assertValidListLinks(resp.result.get('links')) + + def test_get_single_access_token(self): + self.test_oauth_flow() + resp = self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens/%(key)s' + % {'user_id': self.user_id, + 'key': self.access_token.key}) + entity = resp.result.get('access_token') + self.assertTrue(entity['id'], self.access_token.key) + self.assertTrue(entity['consumer_id'], self.consumer.key) + + def test_get_access_token_dne(self): + self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens/%(key)s' + % {'user_id': self.user_id, + 'key': uuid.uuid4().hex}, + expected_status=404) + + def test_list_all_roles_in_access_token(self): + self.test_oauth_flow() + resp = self.get('/users/%(id)s/OS-OAUTH1/access_tokens/%(key)s/roles' + % {'id': self.user_id, + 'key': self.access_token.key}) + entities = resp.result.get('roles') + self.assertTrue(len(entities) > 0) + self.assertValidListLinks(resp.result.get('links')) + + def test_get_role_in_access_token(self): + self.test_oauth_flow() + url = ('/users/%(id)s/OS-OAUTH1/access_tokens/%(key)s/roles/%(role)s' + % {'id': self.user_id, 'key': self.access_token.key, + 'role': self.role_id}) + resp = self.get(url) + entity = resp.result.get('role') + self.assertTrue(entity['id'], self.role_id) + + def test_get_role_in_access_token_dne(self): + self.test_oauth_flow() + url = ('/users/%(id)s/OS-OAUTH1/access_tokens/%(key)s/roles/%(role)s' + % {'id': self.user_id, 'key': self.access_token.key, + 'role': uuid.uuid4().hex}) + self.get(url, expected_status=404) + + def test_list_and_delete_access_tokens(self): + self.test_oauth_flow() + # List access_tokens should be > 0 + resp = self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens' + % {'user_id': self.user_id}) + entities = resp.result.get('access_tokens') + self.assertTrue(len(entities) > 0) + self.assertValidListLinks(resp.result.get('links')) + + # Delete access_token + resp = self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s' + % {'user': self.user_id, + 'auth': self.access_token.key}) + self.assertResponseStatus(resp, 204) + + # List access_token should be 0 + resp = self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens' + % {'user_id': self.user_id}) + entities = resp.result.get('access_tokens') + self.assertTrue(len(entities) == 0) + self.assertValidListLinks(resp.result.get('links')) + + +class AuthTokenTests(OAuthFlowTests): + + def test_keystone_token_is_valid(self): + self.test_oauth_flow() + headers = {'X-Subject-Token': self.keystone_token_id, + 'X-Auth-Token': self.keystone_token_id} + r = self.get('/auth/tokens', headers=headers) + self.assertValidTokenResponse(r, self.user) + + # now verify the oauth section + oauth_section = r.result['token']['OS-OAUTH1'] + self.assertEquals(oauth_section['access_token_id'], + self.access_token.key) + self.assertEquals(oauth_section['consumer_id'], self.consumer.key) + + def test_delete_access_token_also_revokes_token(self): + self.test_oauth_flow() + + # Delete access token + resp = self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s' + % {'user': self.user_id, + 'auth': self.access_token.key}) + self.assertResponseStatus(resp, 204) + + # Check Keystone Token no longer exists + headers = {'X-Subject-Token': self.keystone_token_id, + 'X-Auth-Token': self.keystone_token_id} + self.get('/auth/tokens', headers=headers, + expected_status=401) + + def test_deleting_consumer_also_deletes_tokens(self): + self.test_oauth_flow() + + # Delete consumer + consumer_id = self.consumer.key + resp = self.delete('/OS-OAUTH1/consumers/%(consumer_id)s' + % {'consumer_id': consumer_id}) + self.assertResponseStatus(resp, 204) + + # List access_token should be 0 + resp = self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens' + % {'user_id': self.user_id}) + entities = resp.result.get('access_tokens') + self.assertEqual(len(entities), 0) + + # Check Keystone Token no longer exists + headers = {'X-Subject-Token': self.keystone_token_id, + 'X-Auth-Token': self.keystone_token_id} + self.head('/auth/tokens', headers=headers, + expected_status=401) + + def test_change_user_password_also_deletes_tokens(self): + self.test_oauth_flow() + + # delegated keystone token exists + headers = {'X-Subject-Token': self.keystone_token_id, + 'X-Auth-Token': self.keystone_token_id} + r = self.get('/auth/tokens', headers=headers) + self.assertValidTokenResponse(r, self.user) + + user = {'password': uuid.uuid4().hex} + r = self.patch('/users/%(user_id)s' % { + 'user_id': self.user['id']}, + body={'user': user}) + + headers = {'X-Subject-Token': self.keystone_token_id, + 'X-Auth-Token': self.keystone_token_id} + self.admin_request(path='/auth/tokens', headers=headers, + method='GET', expected_status=404) + + def test_deleting_project_also_invalidates_tokens(self): + self.test_oauth_flow() + + # delegated keystone token exists + headers = {'X-Subject-Token': self.keystone_token_id, + 'X-Auth-Token': self.keystone_token_id} + r = self.get('/auth/tokens', headers=headers) + self.assertValidTokenResponse(r, self.user) + + r = self.delete('/projects/%(project_id)s' % { + 'project_id': self.project_id}) + + headers = {'X-Subject-Token': self.keystone_token_id, + 'X-Auth-Token': self.keystone_token_id} + self.admin_request(path='/auth/tokens', headers=headers, + method='GET', expected_status=404) + + def test_token_chaining_is_not_allowed(self): + self.test_oauth_flow() + + #attempt to re-authenticate (token chain) with the given token + path = '/v3/auth/tokens/' + auth_data = self.build_authentication_request( + token=self.keystone_token_id) + + self.admin_request( + path=path, + body=auth_data, + token=self.keystone_token_id, + method='POST', + expected_status=403) + + def test_list_keystone_tokens_by_consumer(self): + self.test_oauth_flow() + tokens = self.token_api.list_tokens(self.user_id, + consumer_id=self.consumer.key) + keystone_token_uuid = cms.cms_hash_token(self.keystone_token_id) + self.assertTrue(len(tokens) > 0) + self.assertTrue(keystone_token_uuid in tokens) + + +class MaliciousOAuth1Tests(OAuth1Tests): + + def test_bad_consumer_secret(self): + consumer = self._create_single_consumer() + consumer_id = consumer.get('id') + consumer = oauth1.Consumer(consumer_id, "bad_secret") + url, headers = self._create_request_token(consumer, + self.role_id, + self.project_id) + self.post(url, headers=headers, expected_status=500) + + def test_bad_request_token_key(self): + consumer = self._create_single_consumer() + consumer_id = consumer.get('id') + consumer_secret = consumer.get('secret') + consumer = oauth1.Consumer(consumer_id, consumer_secret) + url, headers = self._create_request_token(consumer, + self.role_id, + self.project_id) + self.post(url, headers=headers) + url = self._authorize_request_token("bad_key") + self.put(url, expected_status=404) + + def test_bad_verifier(self): + consumer = self._create_single_consumer() + consumer_id = consumer.get('id') + consumer_secret = consumer.get('secret') + consumer = oauth1.Consumer(consumer_id, consumer_secret) + + url, headers = self._create_request_token(consumer, + self.role_id, + self.project_id) + content = self.post(url, headers=headers) + credentials = urlparse.parse_qs(content.result) + request_key = credentials.get('oauth_token')[0] + request_secret = credentials.get('oauth_token_secret')[0] + request_token = oauth1.Token(request_key, request_secret) + + url = self._authorize_request_token(request_key) + resp = self.put(url, expected_status=200) + verifier = resp.result['token']['oauth_verifier'] + self.assertIsNotNone(verifier) + + request_token.set_verifier("bad verifier") + url, headers = self._create_access_token(consumer, + request_token) + self.post(url, headers=headers, expected_status=401) + + def test_bad_requested_roles(self): + consumer = self._create_single_consumer() + consumer_id = consumer.get('id') + consumer_secret = consumer.get('secret') + consumer = oauth1.Consumer(consumer_id, consumer_secret) + + url, headers = self._create_request_token(consumer, + "bad_role", + self.project_id) + self.post(url, headers=headers, expected_status=401) + + def test_bad_authorizing_roles(self): + consumer = self._create_single_consumer() + consumer_id = consumer.get('id') + consumer_secret = consumer.get('secret') + consumer = oauth1.Consumer(consumer_id, consumer_secret) + + url, headers = self._create_request_token(consumer, + self.role_id, + self.project_id) + content = self.post(url, headers=headers) + credentials = urlparse.parse_qs(content.result) + request_key = credentials.get('oauth_token')[0] + + self.identity_api.remove_role_from_user_and_project(self.user_id, + self.project_id, + self.role_id) + url = self._authorize_request_token(request_key) + self.admin_request(path=url, method='PUT', expected_status=404) + + def test_expired_authorizing_request_token(self): + CONF.oauth1.request_token_duration = -1 + + consumer = self._create_single_consumer() + consumer_id = consumer.get('id') + consumer_secret = consumer.get('secret') + self.consumer = oauth1.Consumer(consumer_id, consumer_secret) + self.assertIsNotNone(self.consumer.key) + + url, headers = self._create_request_token(self.consumer, + self.role_id, + self.project_id) + content = self.post(url, headers=headers) + credentials = urlparse.parse_qs(content.result) + request_key = credentials.get('oauth_token')[0] + request_secret = credentials.get('oauth_token_secret')[0] + self.request_token = oauth1.Token(request_key, request_secret) + self.assertIsNotNone(self.request_token.key) + + url = self._authorize_request_token(request_key) + self.put(url, expected_status=401) + + def test_expired_creating_keystone_token(self): + CONF.oauth1.access_token_duration = -1 + consumer = self._create_single_consumer() + consumer_id = consumer.get('id') + consumer_secret = consumer.get('secret') + self.consumer = oauth1.Consumer(consumer_id, consumer_secret) + self.assertIsNotNone(self.consumer.key) + + url, headers = self._create_request_token(self.consumer, + self.role_id, + self.project_id) + content = self.post(url, headers=headers) + credentials = urlparse.parse_qs(content.result) + request_key = credentials.get('oauth_token')[0] + request_secret = credentials.get('oauth_token_secret')[0] + self.request_token = oauth1.Token(request_key, request_secret) + self.assertIsNotNone(self.request_token.key) + + url = self._authorize_request_token(request_key) + resp = self.put(url, expected_status=200) + self.verifier = resp.result['token']['oauth_verifier'] + + self.request_token.set_verifier(self.verifier) + url, headers = self._create_access_token(self.consumer, + self.request_token) + content = self.post(url, headers=headers) + credentials = urlparse.parse_qs(content.result) + access_key = credentials.get('oauth_token')[0] + access_secret = credentials.get('oauth_token_secret')[0] + self.access_token = oauth1.Token(access_key, access_secret) + self.assertIsNotNone(self.access_token.key) + + url, headers, body = self._get_oauth_token(self.consumer, + self.access_token) + self.post(url, headers=headers, body=body, expected_status=401) |