diff options
| author | Jenkins <jenkins@review.openstack.org> | 2013-08-17 04:27:21 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2013-08-17 04:27:21 +0000 |
| commit | 14cba15fcb9a03adb82bbe586f6431791077f1e8 (patch) | |
| tree | 7fb8487ce511a853da2b8abe9e6662dab9487b5a /keystone/tests | |
| parent | 781c65b72b78bd1e2b1d93db029d6b0c6fbc2050 (diff) | |
| parent | bcaa3072f37d3af3f9d526f18f311411ceeae160 (diff) | |
| download | keystone-14cba15fcb9a03adb82bbe586f6431791077f1e8.tar.gz keystone-14cba15fcb9a03adb82bbe586f6431791077f1e8.tar.xz keystone-14cba15fcb9a03adb82bbe586f6431791077f1e8.zip | |
Merge "Add delegated_auth support for keystone"
Diffstat (limited to 'keystone/tests')
| -rw-r--r-- | keystone/tests/core.py | 3 | ||||
| -rw-r--r-- | keystone/tests/test_drivers.py | 5 | ||||
| -rw-r--r-- | keystone/tests/test_overrides.conf | 3 | ||||
| -rw-r--r-- | keystone/tests/test_sql_migrate_extensions.py | 63 | ||||
| -rw-r--r-- | keystone/tests/test_v3_oauth1.py | 574 |
5 files changed, 647 insertions, 1 deletions
diff --git a/keystone/tests/core.py b/keystone/tests/core.py index b42a8709..cba6cbf8 100644 --- a/keystone/tests/core.py +++ b/keystone/tests/core.py @@ -45,6 +45,7 @@ from keystone.common import utils from keystone.common import wsgi from keystone import config from keystone.contrib import ec2 +from keystone.contrib import oauth1 from keystone import credential from keystone import exception from keystone import identity @@ -268,7 +269,7 @@ class TestCase(NoModule, unittest.TestCase): # assignment manager gets the default assignment driver from the # identity driver. for manager in [identity, assignment, catalog, credential, ec2, policy, - token, token_provider, trust]: + token, token_provider, trust, oauth1]: # manager.__name__ is like keystone.xxx[.yyy], # converted to xxx[_yyy] manager_name = ('%s_api' % diff --git a/keystone/tests/test_drivers.py b/keystone/tests/test_drivers.py index c83c1a89..888b365c 100644 --- a/keystone/tests/test_drivers.py +++ b/keystone/tests/test_drivers.py @@ -3,6 +3,7 @@ import unittest2 as unittest from keystone import assignment from keystone import catalog +from keystone.contrib import oauth1 from keystone import exception from keystone import identity from keystone import policy @@ -55,3 +56,7 @@ class TestDrivers(unittest.TestCase): def test_token_driver_unimplemented(self): interface = token.Driver() self.assertInterfaceNotImplemented(interface) + + def test_oauth1_driver_unimplemented(self): + interface = oauth1.Driver() + self.assertInterfaceNotImplemented(interface) diff --git a/keystone/tests/test_overrides.conf b/keystone/tests/test_overrides.conf index aac29f26..5cd522b2 100644 --- a/keystone/tests/test_overrides.conf +++ b/keystone/tests/test_overrides.conf @@ -14,6 +14,9 @@ driver = keystone.trust.backends.kvs.Trust [token] driver = keystone.token.backends.kvs.Token +[oauth1] +driver = keystone.contrib.oauth1.backends.kvs.OAuth1 + [signing] certfile = ../../examples/pki/certs/signing_cert.pem keyfile = ../../examples/pki/private/signing_key.pem diff --git a/keystone/tests/test_sql_migrate_extensions.py b/keystone/tests/test_sql_migrate_extensions.py index 4a529559..f9393cbe 100644 --- a/keystone/tests/test_sql_migrate_extensions.py +++ b/keystone/tests/test_sql_migrate_extensions.py @@ -27,6 +27,7 @@ To run these tests against a live database: """ from keystone.contrib import example +from keystone.contrib import oauth1 import test_sql_upgrade @@ -45,3 +46,65 @@ class SqlUpgradeExampleExtension(test_sql_upgrade.SqlMigrateBase): self.assertTableColumns('example', ['id', 'type', 'extra']) self.downgrade(0, repository=self.repo_path) self.assertTableDoesNotExist('example') + + +class SqlUpgradeOAuth1Extension(test_sql_upgrade.SqlMigrateBase): + def repo_package(self): + return oauth1 + + def test_upgrade(self): + self.assertTableDoesNotExist('consumer') + self.assertTableDoesNotExist('request_token') + self.assertTableDoesNotExist('access_token') + self.upgrade(1, repository=self.repo_path) + self.assertTableColumns('consumer', + ['id', + 'description', + 'secret', + 'extra']) + self.assertTableColumns('request_token', + ['id', + 'request_secret', + 'verifier', + 'authorizing_user_id', + 'requested_project_id', + 'requested_roles', + 'consumer_id', + 'expires_at']) + self.assertTableColumns('access_token', + ['id', + 'access_secret', + 'authorizing_user_id', + 'project_id', + 'requested_roles', + 'consumer_id', + 'expires_at']) + + def test_downgrade(self): + self.upgrade(1, repository=self.repo_path) + self.assertTableColumns('consumer', + ['id', + 'description', + 'secret', + 'extra']) + self.assertTableColumns('request_token', + ['id', + 'request_secret', + 'verifier', + 'authorizing_user_id', + 'requested_project_id', + 'requested_roles', + 'consumer_id', + 'expires_at']) + self.assertTableColumns('access_token', + ['id', + 'access_secret', + 'authorizing_user_id', + 'project_id', + 'requested_roles', + 'consumer_id', + 'expires_at']) + self.downgrade(0, repository=self.repo_path) + self.assertTableDoesNotExist('consumer') + self.assertTableDoesNotExist('request_token') + self.assertTableDoesNotExist('access_token') diff --git a/keystone/tests/test_v3_oauth1.py b/keystone/tests/test_v3_oauth1.py new file mode 100644 index 00000000..a0ae5fc6 --- /dev/null +++ b/keystone/tests/test_v3_oauth1.py @@ -0,0 +1,574 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import os +import urlparse +import uuid + +import webtest + +from keystone.common import cms +from keystone import config +from keystone.contrib import oauth1 +from keystone.contrib.oauth1 import controllers +from keystone.tests import core + +import test_v3 + + +OAUTH_PASTE_FILE = 'v3_oauth1-paste.ini' +CONF = config.CONF + + +class OAuth1Tests(test_v3.RestfulTestCase): + def setUp(self): + super(OAuth1Tests, self).setUp() + self.controller = controllers.OAuthControllerV3() + self.base_url = CONF.public_endpoint % CONF + "v3" + self._generate_paste_config() + self.load_backends() + self.admin_app = webtest.TestApp( + self.loadapp('v3_oauth1', name='admin')) + self.public_app = webtest.TestApp( + self.loadapp('v3_oauth1', name='admin')) + + def tearDown(self): + os.remove(OAUTH_PASTE_FILE) + + def _generate_paste_config(self): + # Generate a file, based on keystone-paste.ini, + # that includes oauth_extension in the pipeline + old_pipeline = " ec2_extension " + new_pipeline = " oauth_extension ec2_extension " + + with open(core.etcdir('keystone-paste.ini'), 'r') as f: + contents = f.read() + new_contents = contents.replace(old_pipeline, new_pipeline) + with open(OAUTH_PASTE_FILE, 'w') as f: + f.write(new_contents) + + def _create_single_consumer(self): + ref = {'description': uuid.uuid4().hex} + resp = self.post( + '/OS-OAUTH1/consumers', + body={'consumer': ref}) + return resp.result.get('consumer') + + def _oauth_request(self, consumer, token=None, **kw): + return oauth1.Request.from_consumer_and_token(consumer=consumer, + token=token, + **kw) + + def _create_request_token(self, consumer, role, project_id): + params = {'requested_role_ids': role, + 'requested_project_id': project_id} + headers = {'Content-Type': 'application/json'} + url = '/OS-OAUTH1/request_token' + oreq = self._oauth_request( + consumer=consumer, + http_url=self.base_url + url, + http_method='POST', + parameters=params) + + hmac = oauth1.SignatureMethod_HMAC_SHA1() + oreq.sign_request(hmac, consumer, None) + headers.update(oreq.to_header()) + headers.update(params) + return url, headers + + def _create_access_token(self, consumer, token): + headers = {'Content-Type': 'application/json'} + url = '/OS-OAUTH1/access_token' + oreq = self._oauth_request( + consumer=consumer, token=token, + http_method='POST', + http_url=self.base_url + url) + hmac = oauth1.SignatureMethod_HMAC_SHA1() + oreq.sign_request(hmac, consumer, token) + headers.update(oreq.to_header()) + return url, headers + + def _get_oauth_token(self, consumer, token): + headers = {'Content-Type': 'application/json'} + body = {'auth': {'identity': {'methods': ['oauth1'], 'oauth1': {}}}} + url = '/auth/tokens' + oreq = self._oauth_request( + consumer=consumer, token=token, + http_method='POST', + http_url=self.base_url + url) + hmac = oauth1.SignatureMethod_HMAC_SHA1() + oreq.sign_request(hmac, consumer, token) + headers.update(oreq.to_header()) + return url, headers, body + + def _authorize_request_token(self, request_id): + return '/OS-OAUTH1/authorize/%s' % (request_id) + + +class ConsumerCRUDTests(OAuth1Tests): + + def test_consumer_create(self): + description = uuid.uuid4().hex + ref = {'description': description} + resp = self.post( + '/OS-OAUTH1/consumers', + body={'consumer': ref}) + consumer = resp.result.get('consumer') + consumer_id = consumer.get('id') + self.assertEqual(consumer.get('description'), description) + self.assertIsNotNone(consumer_id) + self.assertIsNotNone(consumer.get('secret')) + + def test_consumer_delete(self): + consumer = self._create_single_consumer() + consumer_id = consumer.get('id') + resp = self.delete('/OS-OAUTH1/consumers/%(consumer_id)s' + % {'consumer_id': consumer_id}) + self.assertResponseStatus(resp, 204) + + def test_consumer_get(self): + consumer = self._create_single_consumer() + consumer_id = consumer.get('id') + resp = self.get('/OS-OAUTH1/consumers/%(consumer_id)s' + % {'consumer_id': consumer_id}) + self.assertTrue(resp.result.get('consumer').get('id'), consumer_id) + + def test_consumer_list(self): + resp = self.get('/OS-OAUTH1/consumers') + entities = resp.result.get('consumers') + self.assertIsNotNone(entities) + self.assertValidListLinks(resp.result.get('links')) + + def test_consumer_update(self): + consumer = self._create_single_consumer() + original_id = consumer.get('id') + original_description = consumer.get('description') + original_secret = consumer.get('secret') + update_description = original_description + "_new" + + update_ref = {'description': update_description} + update_resp = self.patch('/OS-OAUTH1/consumers/%(consumer_id)s' + % {'consumer_id': original_id}, + body={'consumer': update_ref}) + consumer = update_resp.result.get('consumer') + self.assertEqual(consumer.get('description'), update_description) + self.assertEqual(consumer.get('id'), original_id) + self.assertEqual(consumer.get('secret'), original_secret) + + def test_consumer_update_bad_secret(self): + consumer = self._create_single_consumer() + original_id = consumer.get('id') + update_ref = copy.deepcopy(consumer) + update_ref['description'] = uuid.uuid4().hex + update_ref['secret'] = uuid.uuid4().hex + self.patch('/OS-OAUTH1/consumers/%(consumer_id)s' + % {'consumer_id': original_id}, + body={'consumer': update_ref}, + expected_status=400) + + def test_consumer_update_bad_id(self): + consumer = self._create_single_consumer() + original_id = consumer.get('id') + original_description = consumer.get('description') + update_description = original_description + "_new" + + update_ref = copy.deepcopy(consumer) + update_ref['description'] = update_description + update_ref['id'] = update_description + self.patch('/OS-OAUTH1/consumers/%(consumer_id)s' + % {'consumer_id': original_id}, + body={'consumer': update_ref}, + expected_status=400) + + def test_consumer_create_no_description(self): + resp = self.post('/OS-OAUTH1/consumers', body={'consumer': {}}) + consumer = resp.result.get('consumer') + consumer_id = consumer.get('id') + self.assertEqual(consumer.get('description'), None) + self.assertIsNotNone(consumer_id) + self.assertIsNotNone(consumer.get('secret')) + + def test_consumer_get_bad_id(self): + self.get('/OS-OAUTH1/consumers/%(consumer_id)s' + % {'consumer_id': uuid.uuid4().hex}, + expected_status=404) + + +class OAuthFlowTests(OAuth1Tests): + + def test_oauth_flow(self): + consumer = self._create_single_consumer() + consumer_id = consumer.get('id') + consumer_secret = consumer.get('secret') + self.consumer = oauth1.Consumer(consumer_id, consumer_secret) + self.assertIsNotNone(self.consumer.key) + + url, headers = self._create_request_token(self.consumer, + self.role_id, + self.project_id) + content = self.post(url, headers=headers) + credentials = urlparse.parse_qs(content.result) + request_key = credentials.get('oauth_token')[0] + request_secret = credentials.get('oauth_token_secret')[0] + self.request_token = oauth1.Token(request_key, request_secret) + self.assertIsNotNone(self.request_token.key) + + url = self._authorize_request_token(request_key) + resp = self.put(url, expected_status=200) + self.verifier = resp.result['token']['oauth_verifier'] + + self.request_token.set_verifier(self.verifier) + url, headers = self._create_access_token(self.consumer, + self.request_token) + content = self.post(url, headers=headers) + credentials = urlparse.parse_qs(content.result) + access_key = credentials.get('oauth_token')[0] + access_secret = credentials.get('oauth_token_secret')[0] + self.access_token = oauth1.Token(access_key, access_secret) + self.assertIsNotNone(self.access_token.key) + + url, headers, body = self._get_oauth_token(self.consumer, + self.access_token) + content = self.post(url, headers=headers, body=body) + self.keystone_token_id = content.headers.get('X-Subject-Token') + self.keystone_token = content.result.get('token') + self.assertIsNotNone(self.keystone_token_id) + + +class AccessTokenCRUDTests(OAuthFlowTests): + def test_delete_access_token_dne(self): + self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s' + % {'user': self.user_id, + 'auth': uuid.uuid4().hex}, + expected_status=404) + + def test_list_no_access_tokens(self): + resp = self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens' + % {'user_id': self.user_id}) + entities = resp.result.get('access_tokens') + self.assertTrue(len(entities) == 0) + self.assertValidListLinks(resp.result.get('links')) + + def test_get_single_access_token(self): + self.test_oauth_flow() + resp = self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens/%(key)s' + % {'user_id': self.user_id, + 'key': self.access_token.key}) + entity = resp.result.get('access_token') + self.assertTrue(entity['id'], self.access_token.key) + self.assertTrue(entity['consumer_id'], self.consumer.key) + + def test_get_access_token_dne(self): + self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens/%(key)s' + % {'user_id': self.user_id, + 'key': uuid.uuid4().hex}, + expected_status=404) + + def test_list_all_roles_in_access_token(self): + self.test_oauth_flow() + resp = self.get('/users/%(id)s/OS-OAUTH1/access_tokens/%(key)s/roles' + % {'id': self.user_id, + 'key': self.access_token.key}) + entities = resp.result.get('roles') + self.assertTrue(len(entities) > 0) + self.assertValidListLinks(resp.result.get('links')) + + def test_get_role_in_access_token(self): + self.test_oauth_flow() + url = ('/users/%(id)s/OS-OAUTH1/access_tokens/%(key)s/roles/%(role)s' + % {'id': self.user_id, 'key': self.access_token.key, + 'role': self.role_id}) + resp = self.get(url) + entity = resp.result.get('role') + self.assertTrue(entity['id'], self.role_id) + + def test_get_role_in_access_token_dne(self): + self.test_oauth_flow() + url = ('/users/%(id)s/OS-OAUTH1/access_tokens/%(key)s/roles/%(role)s' + % {'id': self.user_id, 'key': self.access_token.key, + 'role': uuid.uuid4().hex}) + self.get(url, expected_status=404) + + def test_list_and_delete_access_tokens(self): + self.test_oauth_flow() + # List access_tokens should be > 0 + resp = self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens' + % {'user_id': self.user_id}) + entities = resp.result.get('access_tokens') + self.assertTrue(len(entities) > 0) + self.assertValidListLinks(resp.result.get('links')) + + # Delete access_token + resp = self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s' + % {'user': self.user_id, + 'auth': self.access_token.key}) + self.assertResponseStatus(resp, 204) + + # List access_token should be 0 + resp = self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens' + % {'user_id': self.user_id}) + entities = resp.result.get('access_tokens') + self.assertTrue(len(entities) == 0) + self.assertValidListLinks(resp.result.get('links')) + + +class AuthTokenTests(OAuthFlowTests): + + def test_keystone_token_is_valid(self): + self.test_oauth_flow() + headers = {'X-Subject-Token': self.keystone_token_id, + 'X-Auth-Token': self.keystone_token_id} + r = self.get('/auth/tokens', headers=headers) + self.assertValidTokenResponse(r, self.user) + + # now verify the oauth section + oauth_section = r.result['token']['OS-OAUTH1'] + self.assertEquals(oauth_section['access_token_id'], + self.access_token.key) + self.assertEquals(oauth_section['consumer_id'], self.consumer.key) + + def test_delete_access_token_also_revokes_token(self): + self.test_oauth_flow() + + # Delete access token + resp = self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s' + % {'user': self.user_id, + 'auth': self.access_token.key}) + self.assertResponseStatus(resp, 204) + + # Check Keystone Token no longer exists + headers = {'X-Subject-Token': self.keystone_token_id, + 'X-Auth-Token': self.keystone_token_id} + self.get('/auth/tokens', headers=headers, + expected_status=401) + + def test_deleting_consumer_also_deletes_tokens(self): + self.test_oauth_flow() + + # Delete consumer + consumer_id = self.consumer.key + resp = self.delete('/OS-OAUTH1/consumers/%(consumer_id)s' + % {'consumer_id': consumer_id}) + self.assertResponseStatus(resp, 204) + + # List access_token should be 0 + resp = self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens' + % {'user_id': self.user_id}) + entities = resp.result.get('access_tokens') + self.assertEqual(len(entities), 0) + + # Check Keystone Token no longer exists + headers = {'X-Subject-Token': self.keystone_token_id, + 'X-Auth-Token': self.keystone_token_id} + self.head('/auth/tokens', headers=headers, + expected_status=401) + + def test_change_user_password_also_deletes_tokens(self): + self.test_oauth_flow() + + # delegated keystone token exists + headers = {'X-Subject-Token': self.keystone_token_id, + 'X-Auth-Token': self.keystone_token_id} + r = self.get('/auth/tokens', headers=headers) + self.assertValidTokenResponse(r, self.user) + + user = {'password': uuid.uuid4().hex} + r = self.patch('/users/%(user_id)s' % { + 'user_id': self.user['id']}, + body={'user': user}) + + headers = {'X-Subject-Token': self.keystone_token_id, + 'X-Auth-Token': self.keystone_token_id} + self.admin_request(path='/auth/tokens', headers=headers, + method='GET', expected_status=404) + + def test_deleting_project_also_invalidates_tokens(self): + self.test_oauth_flow() + + # delegated keystone token exists + headers = {'X-Subject-Token': self.keystone_token_id, + 'X-Auth-Token': self.keystone_token_id} + r = self.get('/auth/tokens', headers=headers) + self.assertValidTokenResponse(r, self.user) + + r = self.delete('/projects/%(project_id)s' % { + 'project_id': self.project_id}) + + headers = {'X-Subject-Token': self.keystone_token_id, + 'X-Auth-Token': self.keystone_token_id} + self.admin_request(path='/auth/tokens', headers=headers, + method='GET', expected_status=404) + + def test_token_chaining_is_not_allowed(self): + self.test_oauth_flow() + + #attempt to re-authenticate (token chain) with the given token + path = '/v3/auth/tokens/' + auth_data = self.build_authentication_request( + token=self.keystone_token_id) + + self.admin_request( + path=path, + body=auth_data, + token=self.keystone_token_id, + method='POST', + expected_status=403) + + def test_list_keystone_tokens_by_consumer(self): + self.test_oauth_flow() + tokens = self.token_api.list_tokens(self.user_id, + consumer_id=self.consumer.key) + keystone_token_uuid = cms.cms_hash_token(self.keystone_token_id) + self.assertTrue(len(tokens) > 0) + self.assertTrue(keystone_token_uuid in tokens) + + +class MaliciousOAuth1Tests(OAuth1Tests): + + def test_bad_consumer_secret(self): + consumer = self._create_single_consumer() + consumer_id = consumer.get('id') + consumer = oauth1.Consumer(consumer_id, "bad_secret") + url, headers = self._create_request_token(consumer, + self.role_id, + self.project_id) + self.post(url, headers=headers, expected_status=500) + + def test_bad_request_token_key(self): + consumer = self._create_single_consumer() + consumer_id = consumer.get('id') + consumer_secret = consumer.get('secret') + consumer = oauth1.Consumer(consumer_id, consumer_secret) + url, headers = self._create_request_token(consumer, + self.role_id, + self.project_id) + self.post(url, headers=headers) + url = self._authorize_request_token("bad_key") + self.put(url, expected_status=404) + + def test_bad_verifier(self): + consumer = self._create_single_consumer() + consumer_id = consumer.get('id') + consumer_secret = consumer.get('secret') + consumer = oauth1.Consumer(consumer_id, consumer_secret) + + url, headers = self._create_request_token(consumer, + self.role_id, + self.project_id) + content = self.post(url, headers=headers) + credentials = urlparse.parse_qs(content.result) + request_key = credentials.get('oauth_token')[0] + request_secret = credentials.get('oauth_token_secret')[0] + request_token = oauth1.Token(request_key, request_secret) + + url = self._authorize_request_token(request_key) + resp = self.put(url, expected_status=200) + verifier = resp.result['token']['oauth_verifier'] + self.assertIsNotNone(verifier) + + request_token.set_verifier("bad verifier") + url, headers = self._create_access_token(consumer, + request_token) + self.post(url, headers=headers, expected_status=401) + + def test_bad_requested_roles(self): + consumer = self._create_single_consumer() + consumer_id = consumer.get('id') + consumer_secret = consumer.get('secret') + consumer = oauth1.Consumer(consumer_id, consumer_secret) + + url, headers = self._create_request_token(consumer, + "bad_role", + self.project_id) + self.post(url, headers=headers, expected_status=401) + + def test_bad_authorizing_roles(self): + consumer = self._create_single_consumer() + consumer_id = consumer.get('id') + consumer_secret = consumer.get('secret') + consumer = oauth1.Consumer(consumer_id, consumer_secret) + + url, headers = self._create_request_token(consumer, + self.role_id, + self.project_id) + content = self.post(url, headers=headers) + credentials = urlparse.parse_qs(content.result) + request_key = credentials.get('oauth_token')[0] + + self.identity_api.remove_role_from_user_and_project(self.user_id, + self.project_id, + self.role_id) + url = self._authorize_request_token(request_key) + self.admin_request(path=url, method='PUT', expected_status=404) + + def test_expired_authorizing_request_token(self): + CONF.oauth1.request_token_duration = -1 + + consumer = self._create_single_consumer() + consumer_id = consumer.get('id') + consumer_secret = consumer.get('secret') + self.consumer = oauth1.Consumer(consumer_id, consumer_secret) + self.assertIsNotNone(self.consumer.key) + + url, headers = self._create_request_token(self.consumer, + self.role_id, + self.project_id) + content = self.post(url, headers=headers) + credentials = urlparse.parse_qs(content.result) + request_key = credentials.get('oauth_token')[0] + request_secret = credentials.get('oauth_token_secret')[0] + self.request_token = oauth1.Token(request_key, request_secret) + self.assertIsNotNone(self.request_token.key) + + url = self._authorize_request_token(request_key) + self.put(url, expected_status=401) + + def test_expired_creating_keystone_token(self): + CONF.oauth1.access_token_duration = -1 + consumer = self._create_single_consumer() + consumer_id = consumer.get('id') + consumer_secret = consumer.get('secret') + self.consumer = oauth1.Consumer(consumer_id, consumer_secret) + self.assertIsNotNone(self.consumer.key) + + url, headers = self._create_request_token(self.consumer, + self.role_id, + self.project_id) + content = self.post(url, headers=headers) + credentials = urlparse.parse_qs(content.result) + request_key = credentials.get('oauth_token')[0] + request_secret = credentials.get('oauth_token_secret')[0] + self.request_token = oauth1.Token(request_key, request_secret) + self.assertIsNotNone(self.request_token.key) + + url = self._authorize_request_token(request_key) + resp = self.put(url, expected_status=200) + self.verifier = resp.result['token']['oauth_verifier'] + + self.request_token.set_verifier(self.verifier) + url, headers = self._create_access_token(self.consumer, + self.request_token) + content = self.post(url, headers=headers) + credentials = urlparse.parse_qs(content.result) + access_key = credentials.get('oauth_token')[0] + access_secret = credentials.get('oauth_token_secret')[0] + self.access_token = oauth1.Token(access_key, access_secret) + self.assertIsNotNone(self.access_token.key) + + url, headers, body = self._get_oauth_token(self.consumer, + self.access_token) + self.post(url, headers=headers, body=body, expected_status=401) |
