diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/default_fixtures.py | 27 | ||||
-rw-r--r-- | tests/test_auth_token_middleware.py | 666 | ||||
-rw-r--r-- | tests/test_backend.py | 225 | ||||
-rw-r--r-- | tests/test_backend_kvs.py | 27 | ||||
-rw-r--r-- | tests/test_backend_ldap.py | 30 | ||||
-rw-r--r-- | tests/test_keystoneclient.py | 21 | ||||
-rw-r--r-- | tests/test_migrate_nova_auth.py | 5 | ||||
-rw-r--r-- | tests/test_policy.py | 50 | ||||
-rw-r--r-- | tests/test_sql_upgrade.py | 108 | ||||
-rw-r--r-- | tests/test_url_middleware.py | 3 | ||||
-rw-r--r-- | tests/test_v3.py | 2 |
11 files changed, 428 insertions, 736 deletions
diff --git a/tests/default_fixtures.py b/tests/default_fixtures.py index e20f9d7a..44fb64fb 100644 --- a/tests/default_fixtures.py +++ b/tests/default_fixtures.py @@ -20,6 +20,9 @@ from keystone import config +CONF = config.CONF + + DEFAULT_DOMAIN_ID = config.CONF.identity.default_domain_id @@ -34,6 +37,12 @@ TENANTS = [ 'domain_id': DEFAULT_DOMAIN_ID, 'description': 'description', 'enabled': True, + }, { + 'id': 'mtu', + 'name': 'MTU', + 'description': 'description', + 'enabled': True, + 'domain_id': DEFAULT_DOMAIN_ID } ] @@ -63,14 +72,20 @@ USERS = [ 'enabled': False, 'tenant_id': 'baz', 'tenants': ['baz'], + }, { + 'id': 'sna', + 'name': 'SNA', + 'domain_id': DEFAULT_DOMAIN_ID, + 'password': 'snafu', + 'enabled': True, + 'tenants': ['bar'] } ] METADATA = [ { - 'user_id': 'foo', - 'tenant_id': 'bar', - 'extra': 'extra', + 'user_id': 'sna', + 'tenant_id': 'mtu', } ] @@ -81,5 +96,11 @@ ROLES = [ }, { 'id': 'member', 'name': 'Member', + }, { + 'id': CONF.member_role_id, + 'name': CONF.member_role_name, + }, { + 'id': 'other', + 'name': 'Other', } ] diff --git a/tests/test_auth_token_middleware.py b/tests/test_auth_token_middleware.py deleted file mode 100644 index 0a85977d..00000000 --- a/tests/test_auth_token_middleware.py +++ /dev/null @@ -1,666 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2012 OpenStack LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import datetime -import iso8601 -import os -import string -import tempfile - -import webob - -from keystone.common import cms -from keystone.common import utils -from keystone.middleware import auth_token -from keystone.openstack.common import jsonutils -from keystone.openstack.common import timeutils -from keystone import test - - -CERTDIR = test.rootdir("examples/pki/certs") -KEYDIR = test.rootdir("examples/pki/private") -CMSDIR = test.rootdir("examples/pki/cms") -SIGNING_CERT = os.path.join(CERTDIR, 'signing_cert.pem') -SIGNING_KEY = os.path.join(KEYDIR, 'signing_key.pem') -CA = os.path.join(CERTDIR, 'ca.pem') - -REVOCATION_LIST = None -REVOKED_TOKEN = None -REVOKED_TOKEN_HASH = None -SIGNED_REVOCATION_LIST = None -SIGNED_TOKEN_SCOPED = None -SIGNED_TOKEN_UNSCOPED = None -SIGNED_TOKEN_SCOPED_KEY = None -SIGNED_TOKEN_UNSCOPED_KEY = None - -VALID_SIGNED_REVOCATION_LIST = None - -UUID_TOKEN_DEFAULT = "ec6c0710ec2f471498484c1b53ab4f9d" -UUID_TOKEN_NO_SERVICE_CATALOG = '8286720fbe4941e69fa8241723bb02df' -UUID_TOKEN_UNSCOPED = '731f903721c14827be7b2dc912af7776' -VALID_DIABLO_TOKEN = 'b0cf19b55dbb4f20a6ee18e6c6cf1726' - -INVALID_SIGNED_TOKEN = string.replace( - """AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA -BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB -CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC -DDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD -EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE -FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF -0000000000000000000000000000000000000000000000000000000000000000 -1111111111111111111111111111111111111111111111111111111111111111 -2222222222222222222222222222222222222222222222222222222222222222 -3333333333333333333333333333333333333333333333333333333333333333 -4444444444444444444444444444444444444444444444444444444444444444 -5555555555555555555555555555555555555555555555555555555555555555 -6666666666666666666666666666666666666666666666666666666666666666 -7777777777777777777777777777777777777777777777777777777777777777 -8888888888888888888888888888888888888888888888888888888888888888 -9999999999999999999999999999999999999999999999999999999999999999 -0000000000000000000000000000000000000000000000000000000000000000 -xg==""", "\n", "") - -# JSON responses keyed by token ID -TOKEN_RESPONSES = { - UUID_TOKEN_DEFAULT: { - 'access': { - 'token': { - 'id': UUID_TOKEN_DEFAULT, - 'expires': '2999-01-01T00:00:10Z', - 'tenant': { - 'id': 'tenant_id1', - 'name': 'tenant_name1', - }, - }, - 'user': { - 'id': 'user_id1', - 'name': 'user_name1', - 'roles': [ - {'name': 'role1'}, - {'name': 'role2'}, - ], - }, - 'serviceCatalog': {} - }, - }, - VALID_DIABLO_TOKEN: { - 'access': { - 'token': { - 'id': VALID_DIABLO_TOKEN, - 'expires': '2999-01-01T00:00:10', - 'tenantId': 'tenant_id1', - }, - 'user': { - 'id': 'user_id1', - 'name': 'user_name1', - 'roles': [ - {'name': 'role1'}, - {'name': 'role2'}, - ], - }, - }, - }, - UUID_TOKEN_UNSCOPED: { - 'access': { - 'token': { - 'id': UUID_TOKEN_UNSCOPED, - 'expires': '2999-01-01T00:00:10Z', - }, - 'user': { - 'id': 'user_id1', - 'name': 'user_name1', - 'roles': [ - {'name': 'role1'}, - {'name': 'role2'}, - ], - }, - }, - }, - UUID_TOKEN_NO_SERVICE_CATALOG: { - 'access': { - 'token': { - 'id': 'valid-token', - 'expires': '2999-01-01T00:00:10Z', - 'tenant': { - 'id': 'tenant_id1', - 'name': 'tenant_name1', - }, - }, - 'user': { - 'id': 'user_id1', - 'name': 'user_name1', - 'roles': [ - {'name': 'role1'}, - {'name': 'role2'}, - ], - } - }, - }, -} - -FAKE_RESPONSE_STACK = [] - - -# The data for these tests are signed using openssl and are stored in files -# in the signing subdirectory. In order to keep the values consistent between -# the tests and the signed documents, we read them in for use in the tests. -def setUpModule(self): - signing_path = CMSDIR - with open(os.path.join(signing_path, 'auth_token_scoped.pem')) as f: - self.SIGNED_TOKEN_SCOPED = cms.cms_to_token(f.read()) - with open(os.path.join(signing_path, 'auth_token_unscoped.pem')) as f: - self.SIGNED_TOKEN_UNSCOPED = cms.cms_to_token(f.read()) - with open(os.path.join(signing_path, 'auth_token_revoked.pem')) as f: - self.REVOKED_TOKEN = cms.cms_to_token(f.read()) - self.REVOKED_TOKEN_HASH = utils.hash_signed_token(self.REVOKED_TOKEN) - with open(os.path.join(signing_path, 'revocation_list.json')) as f: - self.REVOCATION_LIST = jsonutils.loads(f.read()) - with open(os.path.join(signing_path, 'revocation_list.pem')) as f: - self.VALID_SIGNED_REVOCATION_LIST = jsonutils.dumps( - {'signed': f.read()}) - self.SIGNED_TOKEN_SCOPED_KEY =\ - cms.cms_hash_token(self.SIGNED_TOKEN_SCOPED) - self.SIGNED_TOKEN_UNSCOPED_KEY =\ - cms.cms_hash_token(self.SIGNED_TOKEN_UNSCOPED) - - self.TOKEN_RESPONSES[self.SIGNED_TOKEN_SCOPED_KEY] = { - 'access': { - 'token': { - 'id': self.SIGNED_TOKEN_SCOPED_KEY, - }, - 'user': { - 'id': 'user_id1', - 'name': 'user_name1', - 'tenantId': 'tenant_id1', - 'tenantName': 'tenant_name1', - 'roles': [ - {'name': 'role1'}, - {'name': 'role2'}, - ], - }, - }, - } - - self.TOKEN_RESPONSES[SIGNED_TOKEN_UNSCOPED_KEY] = { - 'access': { - 'token': { - 'id': SIGNED_TOKEN_UNSCOPED_KEY, - }, - 'user': { - 'id': 'user_id1', - 'name': 'user_name1', - 'roles': [ - {'name': 'role1'}, - {'name': 'role2'}, - ], - }, - }, - }, - - -class FakeMemcache(object): - def __init__(self): - self.set_key = None - self.set_value = None - self.token_expiration = None - - def get(self, key): - data = TOKEN_RESPONSES[SIGNED_TOKEN_SCOPED_KEY].copy() - if not data or key != "tokens/%s" % (data['access']['token']['id']): - return - if not self.token_expiration: - dt = datetime.datetime.now() + datetime.timedelta(minutes=5) - self.token_expiration = dt.strftime("%s") - dt = datetime.datetime.now() + datetime.timedelta(hours=24) - ks_expires = dt.isoformat() - data['access']['token']['expires'] = ks_expires - return (data, str(self.token_expiration)) - - def set(self, key, value, time=None, timeout=None): - self.set_value = value - self.set_key = key - - -class FakeHTTPResponse(object): - def __init__(self, status, body): - self.status = status - self.body = body - - def read(self): - return self.body - - -class FakeStackHTTPConnection(object): - - def __init__(self, *args, **kwargs): - pass - - def getresponse(self): - if len(FAKE_RESPONSE_STACK): - return FAKE_RESPONSE_STACK.pop() - return FakeHTTPResponse(500, jsonutils.dumps('UNEXPECTED RESPONSE')) - - def request(self, *_args, **_kwargs): - pass - - def close(self): - pass - - -class FakeHTTPConnection(object): - - last_requested_url = '' - - def __init__(self, *args): - self.send_valid_revocation_list = True - - def request(self, method, path, **kwargs): - """Fakes out several http responses. - - If a POST request is made, we assume the calling code is trying - to get a new admin token. - - If a GET request is made to validate a token, return success - if the token is 'token1'. If a different token is provided, return - a 404, indicating an unknown (therefore unauthorized) token. - - """ - FakeHTTPConnection.last_requested_url = path - if method == 'POST': - status = 200 - body = jsonutils.dumps({ - 'access': { - 'token': {'id': 'admin_token2'}, - }, - }) - - else: - token_id = path.rsplit('/', 1)[1] - if token_id in TOKEN_RESPONSES.keys(): - status = 200 - body = jsonutils.dumps(TOKEN_RESPONSES[token_id]) - elif token_id == "revoked": - status = 200 - body = SIGNED_REVOCATION_LIST - else: - status = 404 - body = str() - - self.resp = FakeHTTPResponse(status, body) - - def getresponse(self): - return self.resp - - def close(self): - pass - - -class FakeApp(object): - """This represents a WSGI app protected by the auth_token middleware.""" - def __init__(self, expected_env=None): - expected_env = expected_env or {} - self.expected_env = { - 'HTTP_X_IDENTITY_STATUS': 'Confirmed', - 'HTTP_X_TENANT_ID': 'tenant_id1', - 'HTTP_X_TENANT_NAME': 'tenant_name1', - 'HTTP_X_USER_ID': 'user_id1', - 'HTTP_X_USER_NAME': 'user_name1', - 'HTTP_X_ROLES': 'role1,role2', - 'HTTP_X_USER': 'user_name1', # deprecated (diablo-compat) - 'HTTP_X_TENANT': 'tenant_name1', # deprecated (diablo-compat) - 'HTTP_X_ROLE': 'role1,role2', # deprecated (diablo-compat) - } - self.expected_env.update(expected_env) - - def __call__(self, env, start_response): - for k, v in self.expected_env.items(): - assert env[k] == v, '%s != %s' % (env[k], v) - - resp = webob.Response() - resp.body = 'SUCCESS' - return resp(env, start_response) - - -class BaseAuthTokenMiddlewareTest(test.TestCase): - - def setUp(self, expected_env=None): - expected_env = expected_env or {} - - conf = { - 'admin_token': 'admin_token1', - 'auth_host': 'keystone.example.com', - 'auth_port': 1234, - 'auth_admin_prefix': '/testadmin', - 'signing_dir': CERTDIR, - } - - self.middleware = auth_token.AuthProtocol(FakeApp(expected_env), conf) - self.middleware.http_client_class = FakeHTTPConnection - self.middleware._iso8601 = iso8601 - - self.response_status = None - self.response_headers = None - self.middleware.revoked_file_name = tempfile.mkstemp()[1] - cache_timeout = datetime.timedelta(days=1) - self.middleware.token_revocation_list_cache_timeout = cache_timeout - self.middleware.token_revocation_list = jsonutils.dumps( - {"revoked": [], "extra": "success"}) - - signed_list = 'SIGNED_REVOCATION_LIST' - valid_signed_list = 'VALID_SIGNED_REVOCATION_LIST' - globals()[signed_list] = globals()[valid_signed_list] - - super(BaseAuthTokenMiddlewareTest, self).setUp() - - def tearDown(self): - super(BaseAuthTokenMiddlewareTest, self).tearDown() - try: - os.remove(self.middleware.revoked_file_name) - except OSError: - pass - - def start_fake_response(self, status, headers): - self.response_status = int(status.split(' ', 1)[0]) - self.response_headers = dict(headers) - - -class StackResponseAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): - """Auth Token middleware test setup that allows the tests to define - a stack of responses to HTTP requests in the test and get those - responses back in sequence for testing. - - Example:: - - resp1 = FakeHTTPResponse(401, jsonutils.dumps('')) - resp2 = FakeHTTPResponse(200, jsonutils.dumps({ - 'access': { - 'token': {'id': 'admin_token2'}, - }, - }) - FAKE_RESPONSE_STACK.append(resp1) - FAKE_RESPONSE_STACK.append(resp2) - - ... do your testing code here ... - - """ - - def setUp(self, expected_env=None): - super(StackResponseAuthTokenMiddlewareTest, self).setUp(expected_env) - self.middleware.http_client_class = FakeStackHTTPConnection - - def test_fetch_revocation_list_with_expire(self): - # first response to revocation list should return 401 Unauthorized - # to pretend to be an expired token - resp1 = FakeHTTPResponse(200, jsonutils.dumps({ - 'access': { - 'token': {'id': 'admin_token2'}, - }, - })) - resp2 = FakeHTTPResponse(401, jsonutils.dumps('')) - resp3 = FakeHTTPResponse(200, jsonutils.dumps({ - 'access': { - 'token': {'id': 'admin_token2'}, - }, - })) - resp4 = FakeHTTPResponse(200, SIGNED_REVOCATION_LIST) - - # first get_admin_token() call - FAKE_RESPONSE_STACK.append(resp1) - # request revocation list, get "unauthorized" due to simulated expired - # token - FAKE_RESPONSE_STACK.append(resp2) - # request a new admin_token - FAKE_RESPONSE_STACK.append(resp3) - # request revocation list, get the revocation list properly - FAKE_RESPONSE_STACK.append(resp4) - - fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list()) - self.assertEqual(fetched_list, REVOCATION_LIST) - - -class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): - """Auth Token middleware should understand Diablo keystone responses.""" - def setUp(self): - # pre-diablo only had Tenant ID, which was also the Name - expected_env = { - 'HTTP_X_TENANT_ID': 'tenant_id1', - 'HTTP_X_TENANT_NAME': 'tenant_id1', - # now deprecated (diablo-compat) - 'HTTP_X_TENANT': 'tenant_id1', - } - super(DiabloAuthTokenMiddlewareTest, self).setUp(expected_env) - - def test_valid_diablo_response(self): - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = VALID_DIABLO_TOKEN - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.response_status, 200) - - -class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): - def assert_valid_request_200(self, token): - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = token - body = self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.response_status, 200) - self.assertTrue(req.headers.get('X-Service-Catalog')) - self.assertEqual(body, ['SUCCESS']) - - def test_valid_uuid_request(self): - self.assert_valid_request_200(UUID_TOKEN_DEFAULT) - self.assertEqual("/testadmin/v2.0/tokens/%s" % UUID_TOKEN_DEFAULT, - FakeHTTPConnection.last_requested_url) - - def test_valid_signed_request(self): - FakeHTTPConnection.last_requested_url = '' - self.assert_valid_request_200(SIGNED_TOKEN_SCOPED) - self.assertEqual(self.middleware.conf['auth_admin_prefix'], - "/testadmin") - #ensure that signed requests do not generate HTTP traffic - self.assertEqual('', FakeHTTPConnection.last_requested_url) - - def assert_unscoped_default_tenant_auto_scopes(self, token): - """Unscoped requests with a default tenant should "auto-scope." - - The implied scope is the user's tenant ID. - - """ - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = token - body = self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.response_status, 200) - self.assertEqual(body, ['SUCCESS']) - - def test_default_tenant_uuid_token(self): - self.assert_unscoped_default_tenant_auto_scopes(UUID_TOKEN_DEFAULT) - - def test_default_tenant_signed_token(self): - self.assert_unscoped_default_tenant_auto_scopes(SIGNED_TOKEN_SCOPED) - - def assert_unscoped_token_receives_401(self, token): - """Unscoped requests with no default tenant ID should be rejected.""" - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = token - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.response_status, 401) - self.assertEqual(self.response_headers['WWW-Authenticate'], - 'Keystone uri=\'https://keystone.example.com:1234\'') - - def test_unscoped_uuid_token_receives_401(self): - self.assert_unscoped_token_receives_401(UUID_TOKEN_UNSCOPED) - - def test_unscoped_pki_token_receives_401(self): - self.assert_unscoped_token_receives_401(SIGNED_TOKEN_UNSCOPED) - - def test_revoked_token_receives_401(self): - self.middleware.token_revocation_list = self.get_revocation_list_json() - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = REVOKED_TOKEN - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.response_status, 401) - - def get_revocation_list_json(self, token_ids=None): - if token_ids is None: - token_ids = [REVOKED_TOKEN_HASH] - revocation_list = {'revoked': [{'id': x, 'expires': timeutils.utcnow()} - for x in token_ids]} - return jsonutils.dumps(revocation_list) - - def test_is_signed_token_revoked_returns_false(self): - #explicitly setting an empty revocation list here to document intent - self.middleware.token_revocation_list = jsonutils.dumps( - {"revoked": [], "extra": "success"}) - result = self.middleware.is_signed_token_revoked(REVOKED_TOKEN) - self.assertFalse(result) - - def test_is_signed_token_revoked_returns_true(self): - self.middleware.token_revocation_list = self.get_revocation_list_json() - result = self.middleware.is_signed_token_revoked(REVOKED_TOKEN) - self.assertTrue(result) - - def test_verify_signed_token_raises_exception_for_revoked_token(self): - self.middleware.token_revocation_list = self.get_revocation_list_json() - with self.assertRaises(auth_token.InvalidUserToken): - self.middleware.verify_signed_token(REVOKED_TOKEN) - - def test_verify_signed_token_succeeds_for_unrevoked_token(self): - self.middleware.token_revocation_list = self.get_revocation_list_json() - self.middleware.verify_signed_token(SIGNED_TOKEN_SCOPED) - - def test_get_token_revocation_list_fetched_time_returns_min(self): - self.middleware.token_revocation_list_fetched_time = None - self.middleware.revoked_file_name = '' - self.assertEqual(self.middleware.token_revocation_list_fetched_time, - datetime.datetime.min) - - def test_get_token_revocation_list_fetched_time_returns_mtime(self): - self.middleware.token_revocation_list_fetched_time = None - mtime = os.path.getmtime(self.middleware.revoked_file_name) - fetched_time = datetime.datetime.fromtimestamp(mtime) - self.assertEqual(self.middleware.token_revocation_list_fetched_time, - fetched_time) - - def test_get_token_revocation_list_fetched_time_returns_value(self): - expected = self.middleware._token_revocation_list_fetched_time - self.assertEqual(self.middleware.token_revocation_list_fetched_time, - expected) - - def test_get_revocation_list_returns_fetched_list(self): - self.middleware.token_revocation_list_fetched_time = None - os.remove(self.middleware.revoked_file_name) - self.assertEqual(self.middleware.token_revocation_list, - REVOCATION_LIST) - - def test_get_revocation_list_returns_current_list_from_memory(self): - self.assertEqual(self.middleware.token_revocation_list, - self.middleware._token_revocation_list) - - def test_get_revocation_list_returns_current_list_from_disk(self): - in_memory_list = self.middleware.token_revocation_list - self.middleware._token_revocation_list = None - self.assertEqual(self.middleware.token_revocation_list, in_memory_list) - - def test_invalid_revocation_list_raises_service_error(self): - globals()['SIGNED_REVOCATION_LIST'] = "{}" - with self.assertRaises(auth_token.ServiceError): - self.middleware.fetch_revocation_list() - - def test_fetch_revocation_list(self): - fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list()) - self.assertEqual(fetched_list, REVOCATION_LIST) - - def test_request_invalid_uuid_token(self): - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = 'invalid-token' - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.response_status, 401) - self.assertEqual(self.response_headers['WWW-Authenticate'], - 'Keystone uri=\'https://keystone.example.com:1234\'') - - def test_request_invalid_signed_token(self): - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = INVALID_SIGNED_TOKEN - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.response_status, 401) - self.assertEqual(self.response_headers['WWW-Authenticate'], - 'Keystone uri=\'https://keystone.example.com:1234\'') - - def test_request_no_token(self): - req = webob.Request.blank('/') - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.response_status, 401) - self.assertEqual(self.response_headers['WWW-Authenticate'], - 'Keystone uri=\'https://keystone.example.com:1234\'') - - def test_request_blank_token(self): - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = '' - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.response_status, 401) - self.assertEqual(self.response_headers['WWW-Authenticate'], - 'Keystone uri=\'https://keystone.example.com:1234\'') - - def test_memcache(self): - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = SIGNED_TOKEN_SCOPED - self.middleware._cache = FakeMemcache() - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.middleware._cache.set_value, None) - - def test_memcache_set_invalid(self): - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = 'invalid-token' - self.middleware._cache = FakeMemcache() - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.middleware._cache.set_value, "invalid") - - def test_memcache_set_expired(self): - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = SIGNED_TOKEN_SCOPED - self.middleware._cache = FakeMemcache() - expired = datetime.datetime.now() - datetime.timedelta(minutes=1) - self.middleware._cache.token_expiration = float(expired.strftime("%s")) - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(len(self.middleware._cache.set_value), 2) - - def test_nomemcache(self): - self.disable_module('memcache') - - conf = { - 'admin_token': 'admin_token1', - 'auth_host': 'keystone.example.com', - 'auth_port': 1234, - 'memcache_servers': 'localhost:11211', - } - - auth_token.AuthProtocol(FakeApp(), conf) - - def test_request_prevent_service_catalog_injection(self): - req = webob.Request.blank('/') - req.headers['X-Service-Catalog'] = '[]' - req.headers['X-Auth-Token'] = UUID_TOKEN_NO_SERVICE_CATALOG - body = self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.response_status, 200) - self.assertFalse(req.headers.get('X-Service-Catalog')) - self.assertEqual(body, ['SUCCESS']) - - def test_will_expire_soon(self): - tenseconds = datetime.datetime.utcnow() + datetime.timedelta( - seconds=10) - self.assertTrue(auth_token.will_expire_soon(tenseconds)) - fortyseconds = datetime.datetime.utcnow() + datetime.timedelta( - seconds=40) - self.assertFalse(auth_token.will_expire_soon(fortyseconds)) diff --git a/tests/test_backend.py b/tests/test_backend.py index 172afbcc..09bc0b8a 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -20,6 +20,7 @@ import uuid import nose.exc from keystone.catalog import core +from keystone import config from keystone import exception from keystone.openstack.common import timeutils from keystone import config @@ -31,6 +32,25 @@ DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id class IdentityTests(object): + def test_project_add_and_remove_user_role(self): + user_refs = self.identity_api.get_project_users(self.tenant_bar['id']) + self.assertNotIn(self.user_two['id'], [x['id'] for x in user_refs]) + + self.identity_api.add_role_to_user_and_project( + tenant_id=self.tenant_bar['id'], + user_id=self.user_two['id'], + role_id=self.role_other['id']) + user_refs = self.identity_api.get_project_users(self.tenant_bar['id']) + self.assertIn(self.user_two['id'], [x['id'] for x in user_refs]) + + self.identity_api.remove_role_from_user_and_project( + tenant_id=self.tenant_bar['id'], + user_id=self.user_two['id'], + role_id=self.role_other['id']) + + user_refs = self.identity_api.get_project_users(self.tenant_bar['id']) + self.assertNotIn(self.user_two['id'], [x['id'] for x in user_refs]) + def test_authenticate_bad_user(self): self.assertRaises(AssertionError, self.identity_api.authenticate, @@ -66,23 +86,25 @@ class IdentityTests(object): def test_authenticate(self): user_ref, tenant_ref, metadata_ref = self.identity_api.authenticate( - user_id=self.user_foo['id'], + user_id=self.user_sna['id'], tenant_id=self.tenant_bar['id'], - password=self.user_foo['password']) + password=self.user_sna['password']) # NOTE(termie): the password field is left in user_foo to make # it easier to authenticate in tests, but should # not be returned by the api - self.user_foo.pop('password') - self.assertDictEqual(user_ref, self.user_foo) + self.user_sna.pop('password') + self.user_sna['enabled'] = True + self.assertDictEqual(user_ref, self.user_sna) self.assertDictEqual(tenant_ref, self.tenant_bar) - self.assertDictEqual(metadata_ref, self.metadata_foobar) + metadata_ref.pop('roles') + self.assertDictEqual(metadata_ref, self.metadata_snamtu) def test_authenticate_role_return(self): self.identity_api.add_role_to_user_and_project( - self.user_foo['id'], self.tenant_bar['id'], 'keystone_admin') + self.user_foo['id'], self.tenant_baz['id'], 'keystone_admin') user_ref, tenant_ref, metadata_ref = self.identity_api.authenticate( user_id=self.user_foo['id'], - tenant_id=self.tenant_bar['id'], + tenant_id=self.tenant_baz['id'], password=self.user_foo['password']) self.assertIn('roles', metadata_ref) self.assertIn('keystone_admin', metadata_ref['roles']) @@ -105,7 +127,8 @@ class IdentityTests(object): # it easier to authenticate in tests, but should # not be returned by the api user.pop('password') - self.assertEquals(metadata_ref, {}) + self.assertEquals(metadata_ref, {"roles": + [CONF.member_role_id]}) self.assertDictEqual(user_ref, user) self.assertDictEqual(tenant_ref, self.tenant_baz) @@ -181,9 +204,10 @@ class IdentityTests(object): def test_get_metadata(self): metadata_ref = self.identity_api.get_metadata( - user_id=self.user_foo['id'], + user_id=self.user_sna['id'], tenant_id=self.tenant_bar['id']) - self.assertDictEqual(metadata_ref, self.metadata_foobar) + metadata_ref.pop('roles') + self.assertDictEqual(metadata_ref, self.metadata_snamtu) def test_get_metadata_404(self): # FIXME(dolph): these exceptions could be more specific @@ -261,6 +285,59 @@ class IdentityTests(object): 'fake2', user) + def test_create_duplicate_user_name_in_different_domains(self): + new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.identity_api.create_domain(new_domain['id'], new_domain) + user1 = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'domain_id': DEFAULT_DOMAIN_ID, + 'password': uuid.uuid4().hex} + user2 = {'id': uuid.uuid4().hex, + 'name': user1['name'], + 'domain_id': new_domain['id'], + 'password': uuid.uuid4().hex} + self.identity_api.create_user(user1['id'], user1) + self.identity_api.create_user(user2['id'], user2) + + def test_move_user_between_domains(self): + domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.identity_api.create_domain(domain1['id'], domain1) + domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.identity_api.create_domain(domain2['id'], domain2) + user = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'domain_id': domain1['id'], + 'password': uuid.uuid4().hex} + self.identity_api.create_user(user['id'], user) + user['domain_id'] = domain2['id'] + self.identity_api.update_user(user['id'], user) + + def test_move_user_between_domains_with_clashing_names_fails(self): + domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.identity_api.create_domain(domain1['id'], domain1) + domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.identity_api.create_domain(domain2['id'], domain2) + # First, create a user in domain1 + user1 = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'domain_id': domain1['id'], + 'password': uuid.uuid4().hex} + self.identity_api.create_user(user1['id'], user1) + # Now create a user in domain2 with a potentially clashing + # name - which should work since we have domain separation + user2 = {'id': uuid.uuid4().hex, + 'name': user1['name'], + 'domain_id': domain2['id'], + 'password': uuid.uuid4().hex} + self.identity_api.create_user(user2['id'], user2) + # Now try and move user1 into the 2nd domain - which should + # fail since the names clash + user1['domain_id'] = domain2['id'] + self.assertRaises(exception.Conflict, + self.identity_api.update_user, + user1['id'], + user1) + def test_rename_duplicate_user_name_fails(self): user1 = {'id': 'fake1', 'name': 'fake1', @@ -318,6 +395,52 @@ class IdentityTests(object): 'fake1', tenant) + def test_create_duplicate_project_name_in_different_domains(self): + new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.identity_api.create_domain(new_domain['id'], new_domain) + tenant1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, + 'domain_id': DEFAULT_DOMAIN_ID} + tenant2 = {'id': uuid.uuid4().hex, 'name': tenant1['name'], + 'domain_id': new_domain['id']} + self.identity_api.create_project(tenant1['id'], tenant1) + self.identity_api.create_project(tenant2['id'], tenant2) + + def test_move_project_between_domains(self): + domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.identity_api.create_domain(domain1['id'], domain1) + domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.identity_api.create_domain(domain2['id'], domain2) + project = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'domain_id': domain1['id']} + self.identity_api.create_project(project['id'], project) + project['domain_id'] = domain2['id'] + self.identity_api.update_project(project['id'], project) + + def test_move_project_between_domains_with_clashing_names_fails(self): + domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.identity_api.create_domain(domain1['id'], domain1) + domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.identity_api.create_domain(domain2['id'], domain2) + # First, create a project in domain1 + project1 = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'domain_id': domain1['id']} + self.identity_api.create_project(project1['id'], project1) + # Now create a project in domain2 with a potentially clashing + # name - which should work since we have domain separation + project2 = {'id': uuid.uuid4().hex, + 'name': project1['name'], + 'domain_id': domain2['id']} + self.identity_api.create_project(project2['id'], project2) + # Now try and move project1 into the 2nd domain - which should + # fail since the names clash + project1['domain_id'] = domain2['id'] + self.assertRaises(exception.Conflict, + self.identity_api.update_project, + project1['id'], + project1) + def test_rename_duplicate_project_name_fails(self): tenant1 = {'id': 'fake1', 'name': 'fake1', 'domain_id': DEFAULT_DOMAIN_ID} @@ -421,14 +544,14 @@ class IdentityTests(object): roles_ref = self.identity_api.list_grants( user_id=self.user_foo['id'], project_id=self.tenant_bar['id']) - self.assertEquals(len(roles_ref), 0) + self.assertEquals(len(roles_ref), 1) self.identity_api.create_grant(user_id=self.user_foo['id'], project_id=self.tenant_bar['id'], role_id='keystone_admin') roles_ref = self.identity_api.list_grants( user_id=self.user_foo['id'], project_id=self.tenant_bar['id']) - self.assertDictEqual(roles_ref[0], self.role_keystone_admin) + self.assertDictEqual(roles_ref[1], self.role_keystone_admin) self.identity_api.create_grant(user_id=self.user_foo['id'], project_id=self.tenant_bar['id'], @@ -475,24 +598,24 @@ class IdentityTests(object): def test_remove_role_grant_from_user_and_project(self): self.identity_api.create_grant(user_id=self.user_foo['id'], - project_id=self.tenant_bar['id'], + project_id=self.tenant_baz['id'], role_id='member') roles_ref = self.identity_api.list_grants( user_id=self.user_foo['id'], - project_id=self.tenant_bar['id']) + project_id=self.tenant_baz['id']) self.assertDictEqual(roles_ref[0], self.role_member) self.identity_api.delete_grant(user_id=self.user_foo['id'], - project_id=self.tenant_bar['id'], + project_id=self.tenant_baz['id'], role_id='member') roles_ref = self.identity_api.list_grants( user_id=self.user_foo['id'], - project_id=self.tenant_bar['id']) + project_id=self.tenant_baz['id']) self.assertEquals(len(roles_ref), 0) self.assertRaises(exception.NotFound, self.identity_api.delete_grant, user_id=self.user_foo['id'], - project_id=self.tenant_bar['id'], + project_id=self.tenant_baz['id'], role_id='member') def test_get_and_remove_role_grant_by_group_and_project(self): @@ -1115,10 +1238,10 @@ class IdentityTests(object): role) def test_add_user_to_project(self): - self.identity_api.add_user_to_project(self.tenant_bar['id'], + self.identity_api.add_user_to_project(self.tenant_baz['id'], self.user_foo['id']) tenants = self.identity_api.get_projects_for_user(self.user_foo['id']) - self.assertIn(self.tenant_bar['id'], tenants) + self.assertIn(self.tenant_baz['id'], tenants) def test_add_user_to_project_404(self): self.assertRaises(exception.ProjectNotFound, @@ -1132,12 +1255,12 @@ class IdentityTests(object): uuid.uuid4().hex) def test_remove_user_from_project(self): - self.identity_api.add_user_to_project(self.tenant_bar['id'], + self.identity_api.add_user_to_project(self.tenant_baz['id'], self.user_foo['id']) - self.identity_api.remove_user_from_project(self.tenant_bar['id'], + self.identity_api.remove_user_from_project(self.tenant_baz['id'], self.user_foo['id']) tenants = self.identity_api.get_projects_for_user(self.user_foo['id']) - self.assertNotIn(self.tenant_bar['id'], tenants) + self.assertNotIn(self.tenant_baz['id'], tenants) def test_remove_user_from_project_404(self): self.assertRaises(exception.ProjectNotFound, @@ -1385,7 +1508,7 @@ class IdentityTests(object): def test_list_projects(self): projects = self.identity_api.list_projects() - self.assertEquals(len(projects), 2) + self.assertEquals(len(projects), 3) project_ids = [] for project in projects: project_ids.append(project.get('id')) @@ -1615,6 +1738,62 @@ class IdentityTests(object): self.identity_api.get_group, group['id']) + def test_create_duplicate_group_name_fails(self): + group1 = {'id': uuid.uuid4().hex, 'domain_id': DEFAULT_DOMAIN_ID, + 'name': uuid.uuid4().hex} + group2 = {'id': uuid.uuid4().hex, 'domain_id': DEFAULT_DOMAIN_ID, + 'name': group1['name']} + self.identity_api.create_group(group1['id'], group1) + self.assertRaises(exception.Conflict, + self.identity_api.create_group, + group2['id'], group2) + + def test_create_duplicate_group_name_in_different_domains(self): + new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.identity_api.create_domain(new_domain['id'], new_domain) + group1 = {'id': uuid.uuid4().hex, 'domain_id': DEFAULT_DOMAIN_ID, + 'name': uuid.uuid4().hex} + group2 = {'id': uuid.uuid4().hex, 'domain_id': new_domain['id'], + 'name': group1['name']} + self.identity_api.create_group(group1['id'], group1) + self.identity_api.create_group(group2['id'], group2) + + def test_move_group_between_domains(self): + domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.identity_api.create_domain(domain1['id'], domain1) + domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.identity_api.create_domain(domain2['id'], domain2) + group = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'domain_id': domain1['id']} + self.identity_api.create_group(group['id'], group) + group['domain_id'] = domain2['id'] + self.identity_api.update_group(group['id'], group) + + def test_move_group_between_domains_with_clashing_names_fails(self): + domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.identity_api.create_domain(domain1['id'], domain1) + domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.identity_api.create_domain(domain2['id'], domain2) + # First, create a group in domain1 + group1 = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'domain_id': domain1['id']} + self.identity_api.create_group(group1['id'], group1) + # Now create a group in domain2 with a potentially clashing + # name - which should work since we have domain separation + group2 = {'id': uuid.uuid4().hex, + 'name': group1['name'], + 'domain_id': domain2['id']} + self.identity_api.create_group(group2['id'], group2) + # Now try and move group1 into the 2nd domain - which should + # fail since the names clash + group1['domain_id'] = domain2['id'] + self.assertRaises(exception.Conflict, + self.identity_api.update_group, + group1['id'], + group1) + def test_project_crud(self): project = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex} diff --git a/tests/test_backend_kvs.py b/tests/test_backend_kvs.py index 9a733ae2..e1d99d47 100644 --- a/tests/test_backend_kvs.py +++ b/tests/test_backend_kvs.py @@ -37,6 +37,33 @@ class KvsIdentity(test.TestCase, test_backend.IdentityTests): # NOTE(chungg): not implemented raise nose.exc.SkipTest('Blocked by bug 1119770') + def test_create_duplicate_group_name_in_different_domains(self): + raise nose.exc.SkipTest('Blocked by bug 1119770') + + def test_create_duplicate_user_name_in_different_domains(self): + raise nose.exc.SkipTest('Blocked by bug 1119770') + + def test_create_duplicate_project_name_in_different_domains(self): + raise nose.exc.SkipTest('Blocked by bug 1119770') + + def test_move_user_between_domains(self): + raise nose.exc.SkipTest('Blocked by bug 1119770') + + def test_move_user_between_domains_with_clashing_names_fails(self): + raise nose.exc.SkipTest('Blocked by bug 1119770') + + def test_move_group_between_domains(self): + raise nose.exc.SkipTest('Blocked by bug 1119770') + + def test_move_group_between_domains_with_clashing_names_fails(self): + raise nose.exc.SkipTest('Blocked by bug 1119770') + + def test_move_project_between_domains(self): + raise nose.exc.SkipTest('Blocked by bug 1119770') + + def test_move_project_between_domains_with_clashing_names_fails(self): + raise nose.exc.SkipTest('Blocked by bug 1119770') + class KvsToken(test.TestCase, test_backend.TokenTests): def setUp(self): diff --git a/tests/test_backend_ldap.py b/tests/test_backend_ldap.py index 8a116cfa..63499bd1 100644 --- a/tests/test_backend_ldap.py +++ b/tests/test_backend_ldap.py @@ -450,3 +450,33 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests): def test_get_project_users(self): raise nose.exc.SkipTest('Blocked by bug 1101287') + + def test_create_duplicate_user_name_in_different_domains(self): + raise nose.exc.SkipTest('Blocked by bug 1101276') + + def test_create_duplicate_project_name_in_different_domains(self): + raise nose.exc.SkipTest('Blocked by bug 1101276') + + def test_create_duplicate_group_name_fails(self): + raise nose.exc.SkipTest('Blocked by bug 1092187') + + def test_create_duplicate_group_name_in_different_domains(self): + raise nose.exc.SkipTest('Blocked by bug 1101276') + + def test_move_user_between_domains(self): + raise nose.exc.SkipTest('Blocked by bug 1101276') + + def test_move_user_between_domains_with_clashing_names_fails(self): + raise nose.exc.SkipTest('Blocked by bug 1101276') + + def test_move_group_between_domains(self): + raise nose.exc.SkipTest('Blocked by bug 1101276') + + def test_move_group_between_domains_with_clashing_names_fails(self): + raise nose.exc.SkipTest('Blocked by bug 1101276') + + def test_move_project_between_domains(self): + raise nose.exc.SkipTest('Blocked by bug 1101276') + + def test_move_project_between_domains_with_clashing_names_fails(self): + raise nose.exc.SkipTest('Blocked by bug 1101276') diff --git a/tests/test_keystoneclient.py b/tests/test_keystoneclient.py index 3863ff54..14089759 100644 --- a/tests/test_keystoneclient.py +++ b/tests/test_keystoneclient.py @@ -804,16 +804,19 @@ class KcMasterTestCase(CompatTestCase, KeystoneClientTests): def test_tenant_add_and_remove_user(self): client = self.get_client(admin=True) - client.roles.add_user_role(tenant=self.tenant_baz['id'], + client.roles.add_user_role(tenant=self.tenant_bar['id'], user=self.user_two['id'], - role=self.role_member['id']) - user_refs = client.tenants.list_users(tenant=self.tenant_baz['id']) + role=self.role_other['id']) + user_refs = client.tenants.list_users(tenant=self.tenant_bar['id']) self.assert_(self.user_two['id'] in [x.id for x in user_refs]) - client.roles.remove_user_role(tenant=self.tenant_baz['id'], + client.roles.remove_user_role(tenant=self.tenant_bar['id'], user=self.user_two['id'], - role=self.role_member['id']) - user_refs = client.tenants.list_users(tenant=self.tenant_baz['id']) - self.assert_(self.user_two['id'] not in [x.id for x in user_refs]) + role=self.role_other['id']) + roles = client.roles.roles_for_user(user=self.user_foo['id'], + tenant=self.tenant_bar['id']) + self.assertNotIn(self.role_other['id'], roles) + user_refs = client.tenants.list_users(tenant=self.tenant_bar['id']) + self.assertNotIn(self.user_two['id'], [x.id for x in user_refs]) def test_user_role_add_404(self): from keystoneclient import exceptions as client_exceptions @@ -1013,7 +1016,7 @@ class KcEssex3TestCase(CompatTestCase, KeystoneClientTests): def test_tenant_add_and_remove_user(self): client = self.get_client(admin=True) - client.roles.add_user_to_tenant(tenant_id=self.tenant_baz['id'], + client.roles.add_user_to_tenant(tenant_id=self.tenant_bar['id'], user_id=self.user_two['id'], role_id=self.role_member['id']) role_refs = client.roles.get_user_role_refs( @@ -1030,7 +1033,7 @@ class KcEssex3TestCase(CompatTestCase, KeystoneClientTests): # use python's scope fall through to leave roleref_ref set break - client.roles.remove_user_from_tenant(tenant_id=self.tenant_baz['id'], + client.roles.remove_user_from_tenant(tenant_id=self.tenant_bar['id'], user_id=self.user_two['id'], role_id=roleref_ref.id) diff --git a/tests/test_migrate_nova_auth.py b/tests/test_migrate_nova_auth.py index 3a257dff..4e3e37b8 100644 --- a/tests/test_migrate_nova_auth.py +++ b/tests/test_migrate_nova_auth.py @@ -131,7 +131,8 @@ class MigrateNovaAuth(test.TestCase): roles = self.identity_api.list_roles() role_names = set([role['name'] for role in roles]) - self.assertEqual(role_names, set(['role2', 'role1', 'role3'])) + self.assertEqual(role_names, set(['role2', 'role1', 'role3', + CONF.member_role_name])) assignment_map = { 'user1': {'proj1': ['role1', 'role2']}, @@ -149,5 +150,7 @@ class MigrateNovaAuth(test.TestCase): user['id'], tenant['id']) actual = [self.identity_api.get_role(role_id)['name'] for role_id in roles] + if CONF.member_role_name in actual: + actual.remove(CONF.member_role_name) expected = old_project_map.get(tenant_name, []) self.assertEqual(set(actual), set(expected)) diff --git a/tests/test_policy.py b/tests/test_policy.py index b4b11ad3..6895958c 100644 --- a/tests/test_policy.py +++ b/tests/test_policy.py @@ -19,11 +19,11 @@ import StringIO import tempfile import urllib2 -from keystone.common import policy as common_policy from keystone import config from keystone import exception -from keystone.policy.backends import rules from keystone import test +from keystone.openstack.common import policy as common_policy +from keystone.policy.backends import rules CONF = config.CONF @@ -51,7 +51,7 @@ class PolicyFileTestCase(test.TestCase): policyfile.write("""{"example:test": ["false:false"]}""") # NOTE(vish): reset stored policy cache so we don't have to sleep(1) rules._POLICY_CACHE = {} - self.assertRaises(exception.Forbidden, rules.enforce, + self.assertRaises(exception.ForbiddenAction, rules.enforce, empty_credentials, action, self.target) @@ -61,7 +61,7 @@ class PolicyTestCase(test.TestCase): rules.reset() # NOTE(vish): preload rules to circumvent reloading from file rules.init() - brain = { + self.rules = { "true": [], "example:allowed": [], "example:denied": [["false:false"]], @@ -73,23 +73,30 @@ class PolicyTestCase(test.TestCase): "example:lowercase_admin": [["role:admin"], ["role:sysadmin"]], "example:uppercase_admin": [["role:ADMIN"], ["role:sysadmin"]], } - # NOTE(vish): then overload underlying brain - common_policy.set_brain(common_policy.HttpBrain(brain)) + + # NOTE(vish): then overload underlying policy engine + self._set_rules() self.credentials = {} self.target = {} + def _set_rules(self): + these_rules = common_policy.Rules( + dict((k, common_policy.parse_rule(v)) + for k, v in self.rules.items())) + common_policy.set_rules(these_rules) + def tearDown(self): rules.reset() super(PolicyTestCase, self).tearDown() def test_enforce_nonexistent_action_throws(self): action = "example:noexist" - self.assertRaises(exception.Forbidden, rules.enforce, + self.assertRaises(exception.ForbiddenAction, rules.enforce, self.credentials, action, self.target) def test_enforce_bad_action_throws(self): action = "example:denied" - self.assertRaises(exception.Forbidden, rules.enforce, + self.assertRaises(exception.ForbiddenAction, rules.enforce, self.credentials, action, self.target) def test_enforce_good_action(self): @@ -105,7 +112,7 @@ class PolicyTestCase(test.TestCase): action = "example:get_http" target = {} result = rules.enforce(self.credentials, action, target) - self.assertEqual(result, None) + self.assertTrue(result) def test_enforce_http_false(self): @@ -114,7 +121,7 @@ class PolicyTestCase(test.TestCase): self.stubs.Set(urllib2, 'urlopen', fakeurlopen) action = "example:get_http" target = {} - self.assertRaises(exception.Forbidden, rules.enforce, + self.assertRaises(exception.ForbiddenAction, rules.enforce, self.credentials, action, target) def test_templatized_enforcement(self): @@ -123,12 +130,12 @@ class PolicyTestCase(test.TestCase): credentials = {'project_id': 'fake', 'roles': []} action = "example:my_file" rules.enforce(credentials, action, target_mine) - self.assertRaises(exception.Forbidden, rules.enforce, + self.assertRaises(exception.ForbiddenAction, rules.enforce, credentials, action, target_not_mine) def test_early_AND_enforcement(self): action = "example:early_and_fail" - self.assertRaises(exception.Forbidden, rules.enforce, + self.assertRaises(exception.ForbiddenAction, rules.enforce, self.credentials, action, self.target) def test_early_OR_enforcement(self): @@ -151,30 +158,31 @@ class DefaultPolicyTestCase(test.TestCase): rules.reset() rules.init() - self.brain = { + self.rules = { "default": [], "example:exist": [["false:false"]] } - - self._set_brain('default') + self._set_rules('default') self.credentials = {} - def _set_brain(self, default_rule): - brain = common_policy.HttpBrain(self.brain, default_rule) - common_policy.set_brain(brain) + def _set_rules(self, default_rule): + these_rules = common_policy.Rules( + dict((k, common_policy.parse_rule(v)) + for k, v in self.rules.items()), default_rule) + common_policy.set_rules(these_rules) def tearDown(self): super(DefaultPolicyTestCase, self).setUp() rules.reset() def test_policy_called(self): - self.assertRaises(exception.Forbidden, rules.enforce, + self.assertRaises(exception.ForbiddenAction, rules.enforce, self.credentials, "example:exist", {}) def test_not_found_policy_calls_default(self): rules.enforce(self.credentials, "example:noexist", {}) def test_default_not_found(self): - self._set_brain("default_noexist") - self.assertRaises(exception.Forbidden, rules.enforce, + self._set_rules("default_noexist") + self.assertRaises(exception.ForbiddenAction, rules.enforce, self.credentials, "example:noexist", {}) diff --git a/tests/test_sql_upgrade.py b/tests/test_sql_upgrade.py index d204492c..85ea7580 100644 --- a/tests/test_sql_upgrade.py +++ b/tests/test_sql_upgrade.py @@ -56,9 +56,9 @@ class SqlUpgradeTests(test.TestCase): self.config([test.etcdir('keystone.conf.sample'), test.testsdir('test_overrides.conf'), test.testsdir('backend_sql.conf')]) + self.base = sql.Base() # create and share a single sqlalchemy engine for testing - self.base = sql.Base() self.engine = self.base.get_engine(allow_global_engine=False) self.Session = self.base.get_sessionmaker(engine=self.engine, autocommit=False) @@ -107,7 +107,7 @@ class SqlUpgradeTests(test.TestCase): actual_cols = [col.name for col in table.columns] self.assertEqual(expected_cols, actual_cols, '%s table' % table_name) - def test_upgrade_0_to_1(self): + def test_upgrade_add_initial_tables(self): self.upgrade(1) self.assertTableColumns("user", ["id", "name", "extra"]) self.assertTableColumns("tenant", ["id", "name", "extra"]) @@ -117,7 +117,7 @@ class SqlUpgradeTests(test.TestCase): self.assertTableColumns("metadata", ["user_id", "tenant_id", "data"]) self.populate_user_table() - def test_upgrade_5_to_6(self): + def test_upgrade_add_policy(self): self.upgrade(5) self.assertTableDoesNotExist('policy') @@ -125,7 +125,7 @@ class SqlUpgradeTests(test.TestCase): self.assertTableExists('policy') self.assertTableColumns('policy', ['id', 'type', 'blob', 'extra']) - def test_upgrade_8_to_10(self): + def test_upgrade_normalize_identity(self): self.upgrade(8) self.populate_user_table() self.populate_tenant_table() @@ -179,7 +179,7 @@ class SqlUpgradeTests(test.TestCase): session.commit() session.close() - def test_upgrade_10_to_13(self): + def test_upgrade_endpoints(self): self.upgrade(10) service_extra = { 'name': uuid.uuid4().hex, @@ -266,7 +266,7 @@ class SqlUpgradeTests(test.TestCase): self.downgrade(14) self.assertTenantTables() - def test_upgrade_13_to_14(self): + def test_upgrade_add_group_tables(self): self.upgrade(13) self.upgrade(14) self.assertTableExists('group') @@ -279,6 +279,7 @@ class SqlUpgradeTests(test.TestCase): self.populate_user_table(with_pass_enab=True) self.populate_tenant_table(with_desc_enab=True) self.upgrade(16) + self.assertTableColumns("user", ["id", "name", "extra", "password", "enabled", "domain_id"]) @@ -299,9 +300,12 @@ class SqlUpgradeTests(test.TestCase): self.assertEqual(a_project.description, default_fixtures.TENANTS[1]['description']) self.assertEqual(a_project.domain_id, DEFAULT_DOMAIN_ID) + session.commit() session.close() + self.check_uniqueness_constraints() + def test_downgrade_16_to_14(self): self.upgrade(16) self.populate_user_table(with_pass_enab_domain=True) @@ -327,7 +331,7 @@ class SqlUpgradeTests(test.TestCase): session.commit() session.close() - def test_downgrade_14_to_13(self): + def test_downgrade_remove_group_tables(self): self.upgrade(14) self.downgrade(13) self.assertTableDoesNotExist('group') @@ -335,7 +339,7 @@ class SqlUpgradeTests(test.TestCase): self.assertTableDoesNotExist('group_domain_metadata') self.assertTableDoesNotExist('user_group_membership') - def test_downgrade_13_to_10(self): + def test_downgrade_endpoints(self): self.upgrade(13) service_extra = { @@ -420,7 +424,7 @@ class SqlUpgradeTests(test.TestCase): "metadata"]: self.assertTableDoesNotExist(table_name) - def test_upgrade_6_to_7(self): + def test_upgrade_add_domain_tables(self): self.upgrade(6) self.assertTableDoesNotExist('credential') self.assertTableDoesNotExist('domain') @@ -436,6 +440,92 @@ class SqlUpgradeTests(test.TestCase): self.assertTableColumns('user_domain_metadata', ['user_id', 'domain_id', 'data']) + def test_upgrade_default_roles(self): + def count_member_roles(): + session = self.Session() + query_string = ("select count(*) as c from role " + "where name='%s'" % config.CONF.member_role_name) + role_count = session.execute(query_string).fetchone()['c'] + session.close() + return role_count + + self.upgrade(16) + self.assertEquals(0, count_member_roles()) + self.upgrade(17) + self.assertEquals(1, count_member_roles()) + self.downgrade(16) + self.assertEquals(0, count_member_roles()) + + def check_uniqueness_constraints(self): + # Check uniqueness constraints for User & Project tables are + # correct following schema modification. The Group table's + # schema is never modified, so we don't bother to check that. + domain_table = sqlalchemy.Table('domain', + self.metadata, + autoload=True) + domain1 = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'enabled': True} + domain2 = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'enabled': True} + cmd = domain_table.insert().values(domain1) + self.engine.execute(cmd) + cmd = domain_table.insert().values(domain2) + self.engine.execute(cmd) + + # First, the User table. + this_table = sqlalchemy.Table('user', + self.metadata, + autoload=True) + user = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'domain_id': domain1['id'], + 'password': uuid.uuid4().hex, + 'enabled': True, + 'extra': json.dumps({})} + cmd = this_table.insert().values(user) + self.engine.execute(cmd) + # now insert a user with the same name into a different + # domain - which should work. + user['id'] = uuid.uuid4().hex + user['domain_id'] = domain2['id'] + cmd = this_table.insert().values(user) + self.engine.execute(cmd) + # TODO(henry-nash). For now, as part of clean-up we + # delete one of these users. Although not part of this test, + # unless we do so the downgrade(16->15) that is part of + # teardown with fail due to having two uses with clashing + # name as we try to revert to a single global name space. This + # limitation is raised as Bug #1125046 and the delete + # could be removed depending on how that bug is resolved. + cmd = this_table.delete(id=user['id']) + self.engine.execute(cmd) + + # Now, the Project table. + this_table = sqlalchemy.Table('project', + self.metadata, + autoload=True) + project = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'domain_id': domain1['id'], + 'description': uuid.uuid4().hex, + 'enabled': True, + 'extra': json.dumps({})} + cmd = this_table.insert().values(project) + self.engine.execute(cmd) + # now insert a project with the same name into a different + # domain - which should work. + project['id'] = uuid.uuid4().hex + project['domain_id'] = domain2['id'] + cmd = this_table.insert().values(project) + self.engine.execute(cmd) + # TODO(henry-nash) For now, we delete one of the projects for + # the same reason as we delete one of the users (Bug #1125046). + # This delete could be removed depending on that bug resolution. + cmd = this_table.delete(id=project['id']) + self.engine.execute(cmd) + def populate_user_table(self, with_pass_enab=False, with_pass_enab_domain=False): # Populate the appropriate fields in the user diff --git a/tests/test_url_middleware.py b/tests/test_url_middleware.py index 19f6141c..d5c2eef1 100644 --- a/tests/test_url_middleware.py +++ b/tests/test_url_middleware.py @@ -19,8 +19,6 @@ import webob from keystone import middleware from keystone import test -import test_auth_token_middleware as test_atm - class FakeApp(object): """Fakes a WSGI app URL normalized.""" @@ -33,7 +31,6 @@ class FakeApp(object): class UrlMiddlewareTest(test.TestCase): def setUp(self): self.middleware = middleware.NormalizingFilter(FakeApp()) - self.middleware.http_client_class = test_atm.FakeHTTPConnection self.response_status = None self.response_headers = None super(UrlMiddlewareTest, self).setUp() diff --git a/tests/test_v3.py b/tests/test_v3.py index 28475e14..f5602035 100644 --- a/tests/test_v3.py +++ b/tests/test_v3.py @@ -185,7 +185,7 @@ class RestfulTestCase(test_content_types.RestfulTestCase): keys = ['name', 'description', 'enabled'] for k in ['id'] + keys: - msg = '%s unnexpectedly None in %s' % (k, entity) + msg = '%s unexpectedly None in %s' % (k, entity) self.assertIsNotNone(entity.get(k), msg) self.assertIsNotNone(entity.get('links')) |