summaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/default_fixtures.py27
-rw-r--r--tests/test_auth_token_middleware.py666
-rw-r--r--tests/test_backend.py225
-rw-r--r--tests/test_backend_kvs.py27
-rw-r--r--tests/test_backend_ldap.py30
-rw-r--r--tests/test_keystoneclient.py21
-rw-r--r--tests/test_migrate_nova_auth.py5
-rw-r--r--tests/test_policy.py50
-rw-r--r--tests/test_sql_upgrade.py108
-rw-r--r--tests/test_url_middleware.py3
-rw-r--r--tests/test_v3.py2
11 files changed, 428 insertions, 736 deletions
diff --git a/tests/default_fixtures.py b/tests/default_fixtures.py
index e20f9d7a..44fb64fb 100644
--- a/tests/default_fixtures.py
+++ b/tests/default_fixtures.py
@@ -20,6 +20,9 @@
from keystone import config
+CONF = config.CONF
+
+
DEFAULT_DOMAIN_ID = config.CONF.identity.default_domain_id
@@ -34,6 +37,12 @@ TENANTS = [
'domain_id': DEFAULT_DOMAIN_ID,
'description': 'description',
'enabled': True,
+ }, {
+ 'id': 'mtu',
+ 'name': 'MTU',
+ 'description': 'description',
+ 'enabled': True,
+ 'domain_id': DEFAULT_DOMAIN_ID
}
]
@@ -63,14 +72,20 @@ USERS = [
'enabled': False,
'tenant_id': 'baz',
'tenants': ['baz'],
+ }, {
+ 'id': 'sna',
+ 'name': 'SNA',
+ 'domain_id': DEFAULT_DOMAIN_ID,
+ 'password': 'snafu',
+ 'enabled': True,
+ 'tenants': ['bar']
}
]
METADATA = [
{
- 'user_id': 'foo',
- 'tenant_id': 'bar',
- 'extra': 'extra',
+ 'user_id': 'sna',
+ 'tenant_id': 'mtu',
}
]
@@ -81,5 +96,11 @@ ROLES = [
}, {
'id': 'member',
'name': 'Member',
+ }, {
+ 'id': CONF.member_role_id,
+ 'name': CONF.member_role_name,
+ }, {
+ 'id': 'other',
+ 'name': 'Other',
}
]
diff --git a/tests/test_auth_token_middleware.py b/tests/test_auth_token_middleware.py
deleted file mode 100644
index 0a85977d..00000000
--- a/tests/test_auth_token_middleware.py
+++ /dev/null
@@ -1,666 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2012 OpenStack LLC
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import datetime
-import iso8601
-import os
-import string
-import tempfile
-
-import webob
-
-from keystone.common import cms
-from keystone.common import utils
-from keystone.middleware import auth_token
-from keystone.openstack.common import jsonutils
-from keystone.openstack.common import timeutils
-from keystone import test
-
-
-CERTDIR = test.rootdir("examples/pki/certs")
-KEYDIR = test.rootdir("examples/pki/private")
-CMSDIR = test.rootdir("examples/pki/cms")
-SIGNING_CERT = os.path.join(CERTDIR, 'signing_cert.pem')
-SIGNING_KEY = os.path.join(KEYDIR, 'signing_key.pem')
-CA = os.path.join(CERTDIR, 'ca.pem')
-
-REVOCATION_LIST = None
-REVOKED_TOKEN = None
-REVOKED_TOKEN_HASH = None
-SIGNED_REVOCATION_LIST = None
-SIGNED_TOKEN_SCOPED = None
-SIGNED_TOKEN_UNSCOPED = None
-SIGNED_TOKEN_SCOPED_KEY = None
-SIGNED_TOKEN_UNSCOPED_KEY = None
-
-VALID_SIGNED_REVOCATION_LIST = None
-
-UUID_TOKEN_DEFAULT = "ec6c0710ec2f471498484c1b53ab4f9d"
-UUID_TOKEN_NO_SERVICE_CATALOG = '8286720fbe4941e69fa8241723bb02df'
-UUID_TOKEN_UNSCOPED = '731f903721c14827be7b2dc912af7776'
-VALID_DIABLO_TOKEN = 'b0cf19b55dbb4f20a6ee18e6c6cf1726'
-
-INVALID_SIGNED_TOKEN = string.replace(
- """AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
-BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB
-CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC
-DDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD
-EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
-0000000000000000000000000000000000000000000000000000000000000000
-1111111111111111111111111111111111111111111111111111111111111111
-2222222222222222222222222222222222222222222222222222222222222222
-3333333333333333333333333333333333333333333333333333333333333333
-4444444444444444444444444444444444444444444444444444444444444444
-5555555555555555555555555555555555555555555555555555555555555555
-6666666666666666666666666666666666666666666666666666666666666666
-7777777777777777777777777777777777777777777777777777777777777777
-8888888888888888888888888888888888888888888888888888888888888888
-9999999999999999999999999999999999999999999999999999999999999999
-0000000000000000000000000000000000000000000000000000000000000000
-xg==""", "\n", "")
-
-# JSON responses keyed by token ID
-TOKEN_RESPONSES = {
- UUID_TOKEN_DEFAULT: {
- 'access': {
- 'token': {
- 'id': UUID_TOKEN_DEFAULT,
- 'expires': '2999-01-01T00:00:10Z',
- 'tenant': {
- 'id': 'tenant_id1',
- 'name': 'tenant_name1',
- },
- },
- 'user': {
- 'id': 'user_id1',
- 'name': 'user_name1',
- 'roles': [
- {'name': 'role1'},
- {'name': 'role2'},
- ],
- },
- 'serviceCatalog': {}
- },
- },
- VALID_DIABLO_TOKEN: {
- 'access': {
- 'token': {
- 'id': VALID_DIABLO_TOKEN,
- 'expires': '2999-01-01T00:00:10',
- 'tenantId': 'tenant_id1',
- },
- 'user': {
- 'id': 'user_id1',
- 'name': 'user_name1',
- 'roles': [
- {'name': 'role1'},
- {'name': 'role2'},
- ],
- },
- },
- },
- UUID_TOKEN_UNSCOPED: {
- 'access': {
- 'token': {
- 'id': UUID_TOKEN_UNSCOPED,
- 'expires': '2999-01-01T00:00:10Z',
- },
- 'user': {
- 'id': 'user_id1',
- 'name': 'user_name1',
- 'roles': [
- {'name': 'role1'},
- {'name': 'role2'},
- ],
- },
- },
- },
- UUID_TOKEN_NO_SERVICE_CATALOG: {
- 'access': {
- 'token': {
- 'id': 'valid-token',
- 'expires': '2999-01-01T00:00:10Z',
- 'tenant': {
- 'id': 'tenant_id1',
- 'name': 'tenant_name1',
- },
- },
- 'user': {
- 'id': 'user_id1',
- 'name': 'user_name1',
- 'roles': [
- {'name': 'role1'},
- {'name': 'role2'},
- ],
- }
- },
- },
-}
-
-FAKE_RESPONSE_STACK = []
-
-
-# The data for these tests are signed using openssl and are stored in files
-# in the signing subdirectory. In order to keep the values consistent between
-# the tests and the signed documents, we read them in for use in the tests.
-def setUpModule(self):
- signing_path = CMSDIR
- with open(os.path.join(signing_path, 'auth_token_scoped.pem')) as f:
- self.SIGNED_TOKEN_SCOPED = cms.cms_to_token(f.read())
- with open(os.path.join(signing_path, 'auth_token_unscoped.pem')) as f:
- self.SIGNED_TOKEN_UNSCOPED = cms.cms_to_token(f.read())
- with open(os.path.join(signing_path, 'auth_token_revoked.pem')) as f:
- self.REVOKED_TOKEN = cms.cms_to_token(f.read())
- self.REVOKED_TOKEN_HASH = utils.hash_signed_token(self.REVOKED_TOKEN)
- with open(os.path.join(signing_path, 'revocation_list.json')) as f:
- self.REVOCATION_LIST = jsonutils.loads(f.read())
- with open(os.path.join(signing_path, 'revocation_list.pem')) as f:
- self.VALID_SIGNED_REVOCATION_LIST = jsonutils.dumps(
- {'signed': f.read()})
- self.SIGNED_TOKEN_SCOPED_KEY =\
- cms.cms_hash_token(self.SIGNED_TOKEN_SCOPED)
- self.SIGNED_TOKEN_UNSCOPED_KEY =\
- cms.cms_hash_token(self.SIGNED_TOKEN_UNSCOPED)
-
- self.TOKEN_RESPONSES[self.SIGNED_TOKEN_SCOPED_KEY] = {
- 'access': {
- 'token': {
- 'id': self.SIGNED_TOKEN_SCOPED_KEY,
- },
- 'user': {
- 'id': 'user_id1',
- 'name': 'user_name1',
- 'tenantId': 'tenant_id1',
- 'tenantName': 'tenant_name1',
- 'roles': [
- {'name': 'role1'},
- {'name': 'role2'},
- ],
- },
- },
- }
-
- self.TOKEN_RESPONSES[SIGNED_TOKEN_UNSCOPED_KEY] = {
- 'access': {
- 'token': {
- 'id': SIGNED_TOKEN_UNSCOPED_KEY,
- },
- 'user': {
- 'id': 'user_id1',
- 'name': 'user_name1',
- 'roles': [
- {'name': 'role1'},
- {'name': 'role2'},
- ],
- },
- },
- },
-
-
-class FakeMemcache(object):
- def __init__(self):
- self.set_key = None
- self.set_value = None
- self.token_expiration = None
-
- def get(self, key):
- data = TOKEN_RESPONSES[SIGNED_TOKEN_SCOPED_KEY].copy()
- if not data or key != "tokens/%s" % (data['access']['token']['id']):
- return
- if not self.token_expiration:
- dt = datetime.datetime.now() + datetime.timedelta(minutes=5)
- self.token_expiration = dt.strftime("%s")
- dt = datetime.datetime.now() + datetime.timedelta(hours=24)
- ks_expires = dt.isoformat()
- data['access']['token']['expires'] = ks_expires
- return (data, str(self.token_expiration))
-
- def set(self, key, value, time=None, timeout=None):
- self.set_value = value
- self.set_key = key
-
-
-class FakeHTTPResponse(object):
- def __init__(self, status, body):
- self.status = status
- self.body = body
-
- def read(self):
- return self.body
-
-
-class FakeStackHTTPConnection(object):
-
- def __init__(self, *args, **kwargs):
- pass
-
- def getresponse(self):
- if len(FAKE_RESPONSE_STACK):
- return FAKE_RESPONSE_STACK.pop()
- return FakeHTTPResponse(500, jsonutils.dumps('UNEXPECTED RESPONSE'))
-
- def request(self, *_args, **_kwargs):
- pass
-
- def close(self):
- pass
-
-
-class FakeHTTPConnection(object):
-
- last_requested_url = ''
-
- def __init__(self, *args):
- self.send_valid_revocation_list = True
-
- def request(self, method, path, **kwargs):
- """Fakes out several http responses.
-
- If a POST request is made, we assume the calling code is trying
- to get a new admin token.
-
- If a GET request is made to validate a token, return success
- if the token is 'token1'. If a different token is provided, return
- a 404, indicating an unknown (therefore unauthorized) token.
-
- """
- FakeHTTPConnection.last_requested_url = path
- if method == 'POST':
- status = 200
- body = jsonutils.dumps({
- 'access': {
- 'token': {'id': 'admin_token2'},
- },
- })
-
- else:
- token_id = path.rsplit('/', 1)[1]
- if token_id in TOKEN_RESPONSES.keys():
- status = 200
- body = jsonutils.dumps(TOKEN_RESPONSES[token_id])
- elif token_id == "revoked":
- status = 200
- body = SIGNED_REVOCATION_LIST
- else:
- status = 404
- body = str()
-
- self.resp = FakeHTTPResponse(status, body)
-
- def getresponse(self):
- return self.resp
-
- def close(self):
- pass
-
-
-class FakeApp(object):
- """This represents a WSGI app protected by the auth_token middleware."""
- def __init__(self, expected_env=None):
- expected_env = expected_env or {}
- self.expected_env = {
- 'HTTP_X_IDENTITY_STATUS': 'Confirmed',
- 'HTTP_X_TENANT_ID': 'tenant_id1',
- 'HTTP_X_TENANT_NAME': 'tenant_name1',
- 'HTTP_X_USER_ID': 'user_id1',
- 'HTTP_X_USER_NAME': 'user_name1',
- 'HTTP_X_ROLES': 'role1,role2',
- 'HTTP_X_USER': 'user_name1', # deprecated (diablo-compat)
- 'HTTP_X_TENANT': 'tenant_name1', # deprecated (diablo-compat)
- 'HTTP_X_ROLE': 'role1,role2', # deprecated (diablo-compat)
- }
- self.expected_env.update(expected_env)
-
- def __call__(self, env, start_response):
- for k, v in self.expected_env.items():
- assert env[k] == v, '%s != %s' % (env[k], v)
-
- resp = webob.Response()
- resp.body = 'SUCCESS'
- return resp(env, start_response)
-
-
-class BaseAuthTokenMiddlewareTest(test.TestCase):
-
- def setUp(self, expected_env=None):
- expected_env = expected_env or {}
-
- conf = {
- 'admin_token': 'admin_token1',
- 'auth_host': 'keystone.example.com',
- 'auth_port': 1234,
- 'auth_admin_prefix': '/testadmin',
- 'signing_dir': CERTDIR,
- }
-
- self.middleware = auth_token.AuthProtocol(FakeApp(expected_env), conf)
- self.middleware.http_client_class = FakeHTTPConnection
- self.middleware._iso8601 = iso8601
-
- self.response_status = None
- self.response_headers = None
- self.middleware.revoked_file_name = tempfile.mkstemp()[1]
- cache_timeout = datetime.timedelta(days=1)
- self.middleware.token_revocation_list_cache_timeout = cache_timeout
- self.middleware.token_revocation_list = jsonutils.dumps(
- {"revoked": [], "extra": "success"})
-
- signed_list = 'SIGNED_REVOCATION_LIST'
- valid_signed_list = 'VALID_SIGNED_REVOCATION_LIST'
- globals()[signed_list] = globals()[valid_signed_list]
-
- super(BaseAuthTokenMiddlewareTest, self).setUp()
-
- def tearDown(self):
- super(BaseAuthTokenMiddlewareTest, self).tearDown()
- try:
- os.remove(self.middleware.revoked_file_name)
- except OSError:
- pass
-
- def start_fake_response(self, status, headers):
- self.response_status = int(status.split(' ', 1)[0])
- self.response_headers = dict(headers)
-
-
-class StackResponseAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
- """Auth Token middleware test setup that allows the tests to define
- a stack of responses to HTTP requests in the test and get those
- responses back in sequence for testing.
-
- Example::
-
- resp1 = FakeHTTPResponse(401, jsonutils.dumps(''))
- resp2 = FakeHTTPResponse(200, jsonutils.dumps({
- 'access': {
- 'token': {'id': 'admin_token2'},
- },
- })
- FAKE_RESPONSE_STACK.append(resp1)
- FAKE_RESPONSE_STACK.append(resp2)
-
- ... do your testing code here ...
-
- """
-
- def setUp(self, expected_env=None):
- super(StackResponseAuthTokenMiddlewareTest, self).setUp(expected_env)
- self.middleware.http_client_class = FakeStackHTTPConnection
-
- def test_fetch_revocation_list_with_expire(self):
- # first response to revocation list should return 401 Unauthorized
- # to pretend to be an expired token
- resp1 = FakeHTTPResponse(200, jsonutils.dumps({
- 'access': {
- 'token': {'id': 'admin_token2'},
- },
- }))
- resp2 = FakeHTTPResponse(401, jsonutils.dumps(''))
- resp3 = FakeHTTPResponse(200, jsonutils.dumps({
- 'access': {
- 'token': {'id': 'admin_token2'},
- },
- }))
- resp4 = FakeHTTPResponse(200, SIGNED_REVOCATION_LIST)
-
- # first get_admin_token() call
- FAKE_RESPONSE_STACK.append(resp1)
- # request revocation list, get "unauthorized" due to simulated expired
- # token
- FAKE_RESPONSE_STACK.append(resp2)
- # request a new admin_token
- FAKE_RESPONSE_STACK.append(resp3)
- # request revocation list, get the revocation list properly
- FAKE_RESPONSE_STACK.append(resp4)
-
- fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
- self.assertEqual(fetched_list, REVOCATION_LIST)
-
-
-class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
- """Auth Token middleware should understand Diablo keystone responses."""
- def setUp(self):
- # pre-diablo only had Tenant ID, which was also the Name
- expected_env = {
- 'HTTP_X_TENANT_ID': 'tenant_id1',
- 'HTTP_X_TENANT_NAME': 'tenant_id1',
- # now deprecated (diablo-compat)
- 'HTTP_X_TENANT': 'tenant_id1',
- }
- super(DiabloAuthTokenMiddlewareTest, self).setUp(expected_env)
-
- def test_valid_diablo_response(self):
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = VALID_DIABLO_TOKEN
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 200)
-
-
-class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
- def assert_valid_request_200(self, token):
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = token
- body = self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 200)
- self.assertTrue(req.headers.get('X-Service-Catalog'))
- self.assertEqual(body, ['SUCCESS'])
-
- def test_valid_uuid_request(self):
- self.assert_valid_request_200(UUID_TOKEN_DEFAULT)
- self.assertEqual("/testadmin/v2.0/tokens/%s" % UUID_TOKEN_DEFAULT,
- FakeHTTPConnection.last_requested_url)
-
- def test_valid_signed_request(self):
- FakeHTTPConnection.last_requested_url = ''
- self.assert_valid_request_200(SIGNED_TOKEN_SCOPED)
- self.assertEqual(self.middleware.conf['auth_admin_prefix'],
- "/testadmin")
- #ensure that signed requests do not generate HTTP traffic
- self.assertEqual('', FakeHTTPConnection.last_requested_url)
-
- def assert_unscoped_default_tenant_auto_scopes(self, token):
- """Unscoped requests with a default tenant should "auto-scope."
-
- The implied scope is the user's tenant ID.
-
- """
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = token
- body = self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 200)
- self.assertEqual(body, ['SUCCESS'])
-
- def test_default_tenant_uuid_token(self):
- self.assert_unscoped_default_tenant_auto_scopes(UUID_TOKEN_DEFAULT)
-
- def test_default_tenant_signed_token(self):
- self.assert_unscoped_default_tenant_auto_scopes(SIGNED_TOKEN_SCOPED)
-
- def assert_unscoped_token_receives_401(self, token):
- """Unscoped requests with no default tenant ID should be rejected."""
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = token
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 401)
- self.assertEqual(self.response_headers['WWW-Authenticate'],
- 'Keystone uri=\'https://keystone.example.com:1234\'')
-
- def test_unscoped_uuid_token_receives_401(self):
- self.assert_unscoped_token_receives_401(UUID_TOKEN_UNSCOPED)
-
- def test_unscoped_pki_token_receives_401(self):
- self.assert_unscoped_token_receives_401(SIGNED_TOKEN_UNSCOPED)
-
- def test_revoked_token_receives_401(self):
- self.middleware.token_revocation_list = self.get_revocation_list_json()
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = REVOKED_TOKEN
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 401)
-
- def get_revocation_list_json(self, token_ids=None):
- if token_ids is None:
- token_ids = [REVOKED_TOKEN_HASH]
- revocation_list = {'revoked': [{'id': x, 'expires': timeutils.utcnow()}
- for x in token_ids]}
- return jsonutils.dumps(revocation_list)
-
- def test_is_signed_token_revoked_returns_false(self):
- #explicitly setting an empty revocation list here to document intent
- self.middleware.token_revocation_list = jsonutils.dumps(
- {"revoked": [], "extra": "success"})
- result = self.middleware.is_signed_token_revoked(REVOKED_TOKEN)
- self.assertFalse(result)
-
- def test_is_signed_token_revoked_returns_true(self):
- self.middleware.token_revocation_list = self.get_revocation_list_json()
- result = self.middleware.is_signed_token_revoked(REVOKED_TOKEN)
- self.assertTrue(result)
-
- def test_verify_signed_token_raises_exception_for_revoked_token(self):
- self.middleware.token_revocation_list = self.get_revocation_list_json()
- with self.assertRaises(auth_token.InvalidUserToken):
- self.middleware.verify_signed_token(REVOKED_TOKEN)
-
- def test_verify_signed_token_succeeds_for_unrevoked_token(self):
- self.middleware.token_revocation_list = self.get_revocation_list_json()
- self.middleware.verify_signed_token(SIGNED_TOKEN_SCOPED)
-
- def test_get_token_revocation_list_fetched_time_returns_min(self):
- self.middleware.token_revocation_list_fetched_time = None
- self.middleware.revoked_file_name = ''
- self.assertEqual(self.middleware.token_revocation_list_fetched_time,
- datetime.datetime.min)
-
- def test_get_token_revocation_list_fetched_time_returns_mtime(self):
- self.middleware.token_revocation_list_fetched_time = None
- mtime = os.path.getmtime(self.middleware.revoked_file_name)
- fetched_time = datetime.datetime.fromtimestamp(mtime)
- self.assertEqual(self.middleware.token_revocation_list_fetched_time,
- fetched_time)
-
- def test_get_token_revocation_list_fetched_time_returns_value(self):
- expected = self.middleware._token_revocation_list_fetched_time
- self.assertEqual(self.middleware.token_revocation_list_fetched_time,
- expected)
-
- def test_get_revocation_list_returns_fetched_list(self):
- self.middleware.token_revocation_list_fetched_time = None
- os.remove(self.middleware.revoked_file_name)
- self.assertEqual(self.middleware.token_revocation_list,
- REVOCATION_LIST)
-
- def test_get_revocation_list_returns_current_list_from_memory(self):
- self.assertEqual(self.middleware.token_revocation_list,
- self.middleware._token_revocation_list)
-
- def test_get_revocation_list_returns_current_list_from_disk(self):
- in_memory_list = self.middleware.token_revocation_list
- self.middleware._token_revocation_list = None
- self.assertEqual(self.middleware.token_revocation_list, in_memory_list)
-
- def test_invalid_revocation_list_raises_service_error(self):
- globals()['SIGNED_REVOCATION_LIST'] = "{}"
- with self.assertRaises(auth_token.ServiceError):
- self.middleware.fetch_revocation_list()
-
- def test_fetch_revocation_list(self):
- fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
- self.assertEqual(fetched_list, REVOCATION_LIST)
-
- def test_request_invalid_uuid_token(self):
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = 'invalid-token'
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 401)
- self.assertEqual(self.response_headers['WWW-Authenticate'],
- 'Keystone uri=\'https://keystone.example.com:1234\'')
-
- def test_request_invalid_signed_token(self):
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = INVALID_SIGNED_TOKEN
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 401)
- self.assertEqual(self.response_headers['WWW-Authenticate'],
- 'Keystone uri=\'https://keystone.example.com:1234\'')
-
- def test_request_no_token(self):
- req = webob.Request.blank('/')
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 401)
- self.assertEqual(self.response_headers['WWW-Authenticate'],
- 'Keystone uri=\'https://keystone.example.com:1234\'')
-
- def test_request_blank_token(self):
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = ''
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 401)
- self.assertEqual(self.response_headers['WWW-Authenticate'],
- 'Keystone uri=\'https://keystone.example.com:1234\'')
-
- def test_memcache(self):
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = SIGNED_TOKEN_SCOPED
- self.middleware._cache = FakeMemcache()
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.middleware._cache.set_value, None)
-
- def test_memcache_set_invalid(self):
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = 'invalid-token'
- self.middleware._cache = FakeMemcache()
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.middleware._cache.set_value, "invalid")
-
- def test_memcache_set_expired(self):
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = SIGNED_TOKEN_SCOPED
- self.middleware._cache = FakeMemcache()
- expired = datetime.datetime.now() - datetime.timedelta(minutes=1)
- self.middleware._cache.token_expiration = float(expired.strftime("%s"))
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(len(self.middleware._cache.set_value), 2)
-
- def test_nomemcache(self):
- self.disable_module('memcache')
-
- conf = {
- 'admin_token': 'admin_token1',
- 'auth_host': 'keystone.example.com',
- 'auth_port': 1234,
- 'memcache_servers': 'localhost:11211',
- }
-
- auth_token.AuthProtocol(FakeApp(), conf)
-
- def test_request_prevent_service_catalog_injection(self):
- req = webob.Request.blank('/')
- req.headers['X-Service-Catalog'] = '[]'
- req.headers['X-Auth-Token'] = UUID_TOKEN_NO_SERVICE_CATALOG
- body = self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 200)
- self.assertFalse(req.headers.get('X-Service-Catalog'))
- self.assertEqual(body, ['SUCCESS'])
-
- def test_will_expire_soon(self):
- tenseconds = datetime.datetime.utcnow() + datetime.timedelta(
- seconds=10)
- self.assertTrue(auth_token.will_expire_soon(tenseconds))
- fortyseconds = datetime.datetime.utcnow() + datetime.timedelta(
- seconds=40)
- self.assertFalse(auth_token.will_expire_soon(fortyseconds))
diff --git a/tests/test_backend.py b/tests/test_backend.py
index 172afbcc..09bc0b8a 100644
--- a/tests/test_backend.py
+++ b/tests/test_backend.py
@@ -20,6 +20,7 @@ import uuid
import nose.exc
from keystone.catalog import core
+from keystone import config
from keystone import exception
from keystone.openstack.common import timeutils
from keystone import config
@@ -31,6 +32,25 @@ DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id
class IdentityTests(object):
+ def test_project_add_and_remove_user_role(self):
+ user_refs = self.identity_api.get_project_users(self.tenant_bar['id'])
+ self.assertNotIn(self.user_two['id'], [x['id'] for x in user_refs])
+
+ self.identity_api.add_role_to_user_and_project(
+ tenant_id=self.tenant_bar['id'],
+ user_id=self.user_two['id'],
+ role_id=self.role_other['id'])
+ user_refs = self.identity_api.get_project_users(self.tenant_bar['id'])
+ self.assertIn(self.user_two['id'], [x['id'] for x in user_refs])
+
+ self.identity_api.remove_role_from_user_and_project(
+ tenant_id=self.tenant_bar['id'],
+ user_id=self.user_two['id'],
+ role_id=self.role_other['id'])
+
+ user_refs = self.identity_api.get_project_users(self.tenant_bar['id'])
+ self.assertNotIn(self.user_two['id'], [x['id'] for x in user_refs])
+
def test_authenticate_bad_user(self):
self.assertRaises(AssertionError,
self.identity_api.authenticate,
@@ -66,23 +86,25 @@ class IdentityTests(object):
def test_authenticate(self):
user_ref, tenant_ref, metadata_ref = self.identity_api.authenticate(
- user_id=self.user_foo['id'],
+ user_id=self.user_sna['id'],
tenant_id=self.tenant_bar['id'],
- password=self.user_foo['password'])
+ password=self.user_sna['password'])
# NOTE(termie): the password field is left in user_foo to make
# it easier to authenticate in tests, but should
# not be returned by the api
- self.user_foo.pop('password')
- self.assertDictEqual(user_ref, self.user_foo)
+ self.user_sna.pop('password')
+ self.user_sna['enabled'] = True
+ self.assertDictEqual(user_ref, self.user_sna)
self.assertDictEqual(tenant_ref, self.tenant_bar)
- self.assertDictEqual(metadata_ref, self.metadata_foobar)
+ metadata_ref.pop('roles')
+ self.assertDictEqual(metadata_ref, self.metadata_snamtu)
def test_authenticate_role_return(self):
self.identity_api.add_role_to_user_and_project(
- self.user_foo['id'], self.tenant_bar['id'], 'keystone_admin')
+ self.user_foo['id'], self.tenant_baz['id'], 'keystone_admin')
user_ref, tenant_ref, metadata_ref = self.identity_api.authenticate(
user_id=self.user_foo['id'],
- tenant_id=self.tenant_bar['id'],
+ tenant_id=self.tenant_baz['id'],
password=self.user_foo['password'])
self.assertIn('roles', metadata_ref)
self.assertIn('keystone_admin', metadata_ref['roles'])
@@ -105,7 +127,8 @@ class IdentityTests(object):
# it easier to authenticate in tests, but should
# not be returned by the api
user.pop('password')
- self.assertEquals(metadata_ref, {})
+ self.assertEquals(metadata_ref, {"roles":
+ [CONF.member_role_id]})
self.assertDictEqual(user_ref, user)
self.assertDictEqual(tenant_ref, self.tenant_baz)
@@ -181,9 +204,10 @@ class IdentityTests(object):
def test_get_metadata(self):
metadata_ref = self.identity_api.get_metadata(
- user_id=self.user_foo['id'],
+ user_id=self.user_sna['id'],
tenant_id=self.tenant_bar['id'])
- self.assertDictEqual(metadata_ref, self.metadata_foobar)
+ metadata_ref.pop('roles')
+ self.assertDictEqual(metadata_ref, self.metadata_snamtu)
def test_get_metadata_404(self):
# FIXME(dolph): these exceptions could be more specific
@@ -261,6 +285,59 @@ class IdentityTests(object):
'fake2',
user)
+ def test_create_duplicate_user_name_in_different_domains(self):
+ new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.identity_api.create_domain(new_domain['id'], new_domain)
+ user1 = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': DEFAULT_DOMAIN_ID,
+ 'password': uuid.uuid4().hex}
+ user2 = {'id': uuid.uuid4().hex,
+ 'name': user1['name'],
+ 'domain_id': new_domain['id'],
+ 'password': uuid.uuid4().hex}
+ self.identity_api.create_user(user1['id'], user1)
+ self.identity_api.create_user(user2['id'], user2)
+
+ def test_move_user_between_domains(self):
+ domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.identity_api.create_domain(domain1['id'], domain1)
+ domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.identity_api.create_domain(domain2['id'], domain2)
+ user = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain1['id'],
+ 'password': uuid.uuid4().hex}
+ self.identity_api.create_user(user['id'], user)
+ user['domain_id'] = domain2['id']
+ self.identity_api.update_user(user['id'], user)
+
+ def test_move_user_between_domains_with_clashing_names_fails(self):
+ domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.identity_api.create_domain(domain1['id'], domain1)
+ domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.identity_api.create_domain(domain2['id'], domain2)
+ # First, create a user in domain1
+ user1 = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain1['id'],
+ 'password': uuid.uuid4().hex}
+ self.identity_api.create_user(user1['id'], user1)
+ # Now create a user in domain2 with a potentially clashing
+ # name - which should work since we have domain separation
+ user2 = {'id': uuid.uuid4().hex,
+ 'name': user1['name'],
+ 'domain_id': domain2['id'],
+ 'password': uuid.uuid4().hex}
+ self.identity_api.create_user(user2['id'], user2)
+ # Now try and move user1 into the 2nd domain - which should
+ # fail since the names clash
+ user1['domain_id'] = domain2['id']
+ self.assertRaises(exception.Conflict,
+ self.identity_api.update_user,
+ user1['id'],
+ user1)
+
def test_rename_duplicate_user_name_fails(self):
user1 = {'id': 'fake1',
'name': 'fake1',
@@ -318,6 +395,52 @@ class IdentityTests(object):
'fake1',
tenant)
+ def test_create_duplicate_project_name_in_different_domains(self):
+ new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.identity_api.create_domain(new_domain['id'], new_domain)
+ tenant1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
+ 'domain_id': DEFAULT_DOMAIN_ID}
+ tenant2 = {'id': uuid.uuid4().hex, 'name': tenant1['name'],
+ 'domain_id': new_domain['id']}
+ self.identity_api.create_project(tenant1['id'], tenant1)
+ self.identity_api.create_project(tenant2['id'], tenant2)
+
+ def test_move_project_between_domains(self):
+ domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.identity_api.create_domain(domain1['id'], domain1)
+ domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.identity_api.create_domain(domain2['id'], domain2)
+ project = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain1['id']}
+ self.identity_api.create_project(project['id'], project)
+ project['domain_id'] = domain2['id']
+ self.identity_api.update_project(project['id'], project)
+
+ def test_move_project_between_domains_with_clashing_names_fails(self):
+ domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.identity_api.create_domain(domain1['id'], domain1)
+ domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.identity_api.create_domain(domain2['id'], domain2)
+ # First, create a project in domain1
+ project1 = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain1['id']}
+ self.identity_api.create_project(project1['id'], project1)
+ # Now create a project in domain2 with a potentially clashing
+ # name - which should work since we have domain separation
+ project2 = {'id': uuid.uuid4().hex,
+ 'name': project1['name'],
+ 'domain_id': domain2['id']}
+ self.identity_api.create_project(project2['id'], project2)
+ # Now try and move project1 into the 2nd domain - which should
+ # fail since the names clash
+ project1['domain_id'] = domain2['id']
+ self.assertRaises(exception.Conflict,
+ self.identity_api.update_project,
+ project1['id'],
+ project1)
+
def test_rename_duplicate_project_name_fails(self):
tenant1 = {'id': 'fake1', 'name': 'fake1',
'domain_id': DEFAULT_DOMAIN_ID}
@@ -421,14 +544,14 @@ class IdentityTests(object):
roles_ref = self.identity_api.list_grants(
user_id=self.user_foo['id'],
project_id=self.tenant_bar['id'])
- self.assertEquals(len(roles_ref), 0)
+ self.assertEquals(len(roles_ref), 1)
self.identity_api.create_grant(user_id=self.user_foo['id'],
project_id=self.tenant_bar['id'],
role_id='keystone_admin')
roles_ref = self.identity_api.list_grants(
user_id=self.user_foo['id'],
project_id=self.tenant_bar['id'])
- self.assertDictEqual(roles_ref[0], self.role_keystone_admin)
+ self.assertDictEqual(roles_ref[1], self.role_keystone_admin)
self.identity_api.create_grant(user_id=self.user_foo['id'],
project_id=self.tenant_bar['id'],
@@ -475,24 +598,24 @@ class IdentityTests(object):
def test_remove_role_grant_from_user_and_project(self):
self.identity_api.create_grant(user_id=self.user_foo['id'],
- project_id=self.tenant_bar['id'],
+ project_id=self.tenant_baz['id'],
role_id='member')
roles_ref = self.identity_api.list_grants(
user_id=self.user_foo['id'],
- project_id=self.tenant_bar['id'])
+ project_id=self.tenant_baz['id'])
self.assertDictEqual(roles_ref[0], self.role_member)
self.identity_api.delete_grant(user_id=self.user_foo['id'],
- project_id=self.tenant_bar['id'],
+ project_id=self.tenant_baz['id'],
role_id='member')
roles_ref = self.identity_api.list_grants(
user_id=self.user_foo['id'],
- project_id=self.tenant_bar['id'])
+ project_id=self.tenant_baz['id'])
self.assertEquals(len(roles_ref), 0)
self.assertRaises(exception.NotFound,
self.identity_api.delete_grant,
user_id=self.user_foo['id'],
- project_id=self.tenant_bar['id'],
+ project_id=self.tenant_baz['id'],
role_id='member')
def test_get_and_remove_role_grant_by_group_and_project(self):
@@ -1115,10 +1238,10 @@ class IdentityTests(object):
role)
def test_add_user_to_project(self):
- self.identity_api.add_user_to_project(self.tenant_bar['id'],
+ self.identity_api.add_user_to_project(self.tenant_baz['id'],
self.user_foo['id'])
tenants = self.identity_api.get_projects_for_user(self.user_foo['id'])
- self.assertIn(self.tenant_bar['id'], tenants)
+ self.assertIn(self.tenant_baz['id'], tenants)
def test_add_user_to_project_404(self):
self.assertRaises(exception.ProjectNotFound,
@@ -1132,12 +1255,12 @@ class IdentityTests(object):
uuid.uuid4().hex)
def test_remove_user_from_project(self):
- self.identity_api.add_user_to_project(self.tenant_bar['id'],
+ self.identity_api.add_user_to_project(self.tenant_baz['id'],
self.user_foo['id'])
- self.identity_api.remove_user_from_project(self.tenant_bar['id'],
+ self.identity_api.remove_user_from_project(self.tenant_baz['id'],
self.user_foo['id'])
tenants = self.identity_api.get_projects_for_user(self.user_foo['id'])
- self.assertNotIn(self.tenant_bar['id'], tenants)
+ self.assertNotIn(self.tenant_baz['id'], tenants)
def test_remove_user_from_project_404(self):
self.assertRaises(exception.ProjectNotFound,
@@ -1385,7 +1508,7 @@ class IdentityTests(object):
def test_list_projects(self):
projects = self.identity_api.list_projects()
- self.assertEquals(len(projects), 2)
+ self.assertEquals(len(projects), 3)
project_ids = []
for project in projects:
project_ids.append(project.get('id'))
@@ -1615,6 +1738,62 @@ class IdentityTests(object):
self.identity_api.get_group,
group['id'])
+ def test_create_duplicate_group_name_fails(self):
+ group1 = {'id': uuid.uuid4().hex, 'domain_id': DEFAULT_DOMAIN_ID,
+ 'name': uuid.uuid4().hex}
+ group2 = {'id': uuid.uuid4().hex, 'domain_id': DEFAULT_DOMAIN_ID,
+ 'name': group1['name']}
+ self.identity_api.create_group(group1['id'], group1)
+ self.assertRaises(exception.Conflict,
+ self.identity_api.create_group,
+ group2['id'], group2)
+
+ def test_create_duplicate_group_name_in_different_domains(self):
+ new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.identity_api.create_domain(new_domain['id'], new_domain)
+ group1 = {'id': uuid.uuid4().hex, 'domain_id': DEFAULT_DOMAIN_ID,
+ 'name': uuid.uuid4().hex}
+ group2 = {'id': uuid.uuid4().hex, 'domain_id': new_domain['id'],
+ 'name': group1['name']}
+ self.identity_api.create_group(group1['id'], group1)
+ self.identity_api.create_group(group2['id'], group2)
+
+ def test_move_group_between_domains(self):
+ domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.identity_api.create_domain(domain1['id'], domain1)
+ domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.identity_api.create_domain(domain2['id'], domain2)
+ group = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain1['id']}
+ self.identity_api.create_group(group['id'], group)
+ group['domain_id'] = domain2['id']
+ self.identity_api.update_group(group['id'], group)
+
+ def test_move_group_between_domains_with_clashing_names_fails(self):
+ domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.identity_api.create_domain(domain1['id'], domain1)
+ domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.identity_api.create_domain(domain2['id'], domain2)
+ # First, create a group in domain1
+ group1 = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain1['id']}
+ self.identity_api.create_group(group1['id'], group1)
+ # Now create a group in domain2 with a potentially clashing
+ # name - which should work since we have domain separation
+ group2 = {'id': uuid.uuid4().hex,
+ 'name': group1['name'],
+ 'domain_id': domain2['id']}
+ self.identity_api.create_group(group2['id'], group2)
+ # Now try and move group1 into the 2nd domain - which should
+ # fail since the names clash
+ group1['domain_id'] = domain2['id']
+ self.assertRaises(exception.Conflict,
+ self.identity_api.update_group,
+ group1['id'],
+ group1)
+
def test_project_crud(self):
project = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': uuid.uuid4().hex}
diff --git a/tests/test_backend_kvs.py b/tests/test_backend_kvs.py
index 9a733ae2..e1d99d47 100644
--- a/tests/test_backend_kvs.py
+++ b/tests/test_backend_kvs.py
@@ -37,6 +37,33 @@ class KvsIdentity(test.TestCase, test_backend.IdentityTests):
# NOTE(chungg): not implemented
raise nose.exc.SkipTest('Blocked by bug 1119770')
+ def test_create_duplicate_group_name_in_different_domains(self):
+ raise nose.exc.SkipTest('Blocked by bug 1119770')
+
+ def test_create_duplicate_user_name_in_different_domains(self):
+ raise nose.exc.SkipTest('Blocked by bug 1119770')
+
+ def test_create_duplicate_project_name_in_different_domains(self):
+ raise nose.exc.SkipTest('Blocked by bug 1119770')
+
+ def test_move_user_between_domains(self):
+ raise nose.exc.SkipTest('Blocked by bug 1119770')
+
+ def test_move_user_between_domains_with_clashing_names_fails(self):
+ raise nose.exc.SkipTest('Blocked by bug 1119770')
+
+ def test_move_group_between_domains(self):
+ raise nose.exc.SkipTest('Blocked by bug 1119770')
+
+ def test_move_group_between_domains_with_clashing_names_fails(self):
+ raise nose.exc.SkipTest('Blocked by bug 1119770')
+
+ def test_move_project_between_domains(self):
+ raise nose.exc.SkipTest('Blocked by bug 1119770')
+
+ def test_move_project_between_domains_with_clashing_names_fails(self):
+ raise nose.exc.SkipTest('Blocked by bug 1119770')
+
class KvsToken(test.TestCase, test_backend.TokenTests):
def setUp(self):
diff --git a/tests/test_backend_ldap.py b/tests/test_backend_ldap.py
index 8a116cfa..63499bd1 100644
--- a/tests/test_backend_ldap.py
+++ b/tests/test_backend_ldap.py
@@ -450,3 +450,33 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
def test_get_project_users(self):
raise nose.exc.SkipTest('Blocked by bug 1101287')
+
+ def test_create_duplicate_user_name_in_different_domains(self):
+ raise nose.exc.SkipTest('Blocked by bug 1101276')
+
+ def test_create_duplicate_project_name_in_different_domains(self):
+ raise nose.exc.SkipTest('Blocked by bug 1101276')
+
+ def test_create_duplicate_group_name_fails(self):
+ raise nose.exc.SkipTest('Blocked by bug 1092187')
+
+ def test_create_duplicate_group_name_in_different_domains(self):
+ raise nose.exc.SkipTest('Blocked by bug 1101276')
+
+ def test_move_user_between_domains(self):
+ raise nose.exc.SkipTest('Blocked by bug 1101276')
+
+ def test_move_user_between_domains_with_clashing_names_fails(self):
+ raise nose.exc.SkipTest('Blocked by bug 1101276')
+
+ def test_move_group_between_domains(self):
+ raise nose.exc.SkipTest('Blocked by bug 1101276')
+
+ def test_move_group_between_domains_with_clashing_names_fails(self):
+ raise nose.exc.SkipTest('Blocked by bug 1101276')
+
+ def test_move_project_between_domains(self):
+ raise nose.exc.SkipTest('Blocked by bug 1101276')
+
+ def test_move_project_between_domains_with_clashing_names_fails(self):
+ raise nose.exc.SkipTest('Blocked by bug 1101276')
diff --git a/tests/test_keystoneclient.py b/tests/test_keystoneclient.py
index 3863ff54..14089759 100644
--- a/tests/test_keystoneclient.py
+++ b/tests/test_keystoneclient.py
@@ -804,16 +804,19 @@ class KcMasterTestCase(CompatTestCase, KeystoneClientTests):
def test_tenant_add_and_remove_user(self):
client = self.get_client(admin=True)
- client.roles.add_user_role(tenant=self.tenant_baz['id'],
+ client.roles.add_user_role(tenant=self.tenant_bar['id'],
user=self.user_two['id'],
- role=self.role_member['id'])
- user_refs = client.tenants.list_users(tenant=self.tenant_baz['id'])
+ role=self.role_other['id'])
+ user_refs = client.tenants.list_users(tenant=self.tenant_bar['id'])
self.assert_(self.user_two['id'] in [x.id for x in user_refs])
- client.roles.remove_user_role(tenant=self.tenant_baz['id'],
+ client.roles.remove_user_role(tenant=self.tenant_bar['id'],
user=self.user_two['id'],
- role=self.role_member['id'])
- user_refs = client.tenants.list_users(tenant=self.tenant_baz['id'])
- self.assert_(self.user_two['id'] not in [x.id for x in user_refs])
+ role=self.role_other['id'])
+ roles = client.roles.roles_for_user(user=self.user_foo['id'],
+ tenant=self.tenant_bar['id'])
+ self.assertNotIn(self.role_other['id'], roles)
+ user_refs = client.tenants.list_users(tenant=self.tenant_bar['id'])
+ self.assertNotIn(self.user_two['id'], [x.id for x in user_refs])
def test_user_role_add_404(self):
from keystoneclient import exceptions as client_exceptions
@@ -1013,7 +1016,7 @@ class KcEssex3TestCase(CompatTestCase, KeystoneClientTests):
def test_tenant_add_and_remove_user(self):
client = self.get_client(admin=True)
- client.roles.add_user_to_tenant(tenant_id=self.tenant_baz['id'],
+ client.roles.add_user_to_tenant(tenant_id=self.tenant_bar['id'],
user_id=self.user_two['id'],
role_id=self.role_member['id'])
role_refs = client.roles.get_user_role_refs(
@@ -1030,7 +1033,7 @@ class KcEssex3TestCase(CompatTestCase, KeystoneClientTests):
# use python's scope fall through to leave roleref_ref set
break
- client.roles.remove_user_from_tenant(tenant_id=self.tenant_baz['id'],
+ client.roles.remove_user_from_tenant(tenant_id=self.tenant_bar['id'],
user_id=self.user_two['id'],
role_id=roleref_ref.id)
diff --git a/tests/test_migrate_nova_auth.py b/tests/test_migrate_nova_auth.py
index 3a257dff..4e3e37b8 100644
--- a/tests/test_migrate_nova_auth.py
+++ b/tests/test_migrate_nova_auth.py
@@ -131,7 +131,8 @@ class MigrateNovaAuth(test.TestCase):
roles = self.identity_api.list_roles()
role_names = set([role['name'] for role in roles])
- self.assertEqual(role_names, set(['role2', 'role1', 'role3']))
+ self.assertEqual(role_names, set(['role2', 'role1', 'role3',
+ CONF.member_role_name]))
assignment_map = {
'user1': {'proj1': ['role1', 'role2']},
@@ -149,5 +150,7 @@ class MigrateNovaAuth(test.TestCase):
user['id'], tenant['id'])
actual = [self.identity_api.get_role(role_id)['name']
for role_id in roles]
+ if CONF.member_role_name in actual:
+ actual.remove(CONF.member_role_name)
expected = old_project_map.get(tenant_name, [])
self.assertEqual(set(actual), set(expected))
diff --git a/tests/test_policy.py b/tests/test_policy.py
index b4b11ad3..6895958c 100644
--- a/tests/test_policy.py
+++ b/tests/test_policy.py
@@ -19,11 +19,11 @@ import StringIO
import tempfile
import urllib2
-from keystone.common import policy as common_policy
from keystone import config
from keystone import exception
-from keystone.policy.backends import rules
from keystone import test
+from keystone.openstack.common import policy as common_policy
+from keystone.policy.backends import rules
CONF = config.CONF
@@ -51,7 +51,7 @@ class PolicyFileTestCase(test.TestCase):
policyfile.write("""{"example:test": ["false:false"]}""")
# NOTE(vish): reset stored policy cache so we don't have to sleep(1)
rules._POLICY_CACHE = {}
- self.assertRaises(exception.Forbidden, rules.enforce,
+ self.assertRaises(exception.ForbiddenAction, rules.enforce,
empty_credentials, action, self.target)
@@ -61,7 +61,7 @@ class PolicyTestCase(test.TestCase):
rules.reset()
# NOTE(vish): preload rules to circumvent reloading from file
rules.init()
- brain = {
+ self.rules = {
"true": [],
"example:allowed": [],
"example:denied": [["false:false"]],
@@ -73,23 +73,30 @@ class PolicyTestCase(test.TestCase):
"example:lowercase_admin": [["role:admin"], ["role:sysadmin"]],
"example:uppercase_admin": [["role:ADMIN"], ["role:sysadmin"]],
}
- # NOTE(vish): then overload underlying brain
- common_policy.set_brain(common_policy.HttpBrain(brain))
+
+ # NOTE(vish): then overload underlying policy engine
+ self._set_rules()
self.credentials = {}
self.target = {}
+ def _set_rules(self):
+ these_rules = common_policy.Rules(
+ dict((k, common_policy.parse_rule(v))
+ for k, v in self.rules.items()))
+ common_policy.set_rules(these_rules)
+
def tearDown(self):
rules.reset()
super(PolicyTestCase, self).tearDown()
def test_enforce_nonexistent_action_throws(self):
action = "example:noexist"
- self.assertRaises(exception.Forbidden, rules.enforce,
+ self.assertRaises(exception.ForbiddenAction, rules.enforce,
self.credentials, action, self.target)
def test_enforce_bad_action_throws(self):
action = "example:denied"
- self.assertRaises(exception.Forbidden, rules.enforce,
+ self.assertRaises(exception.ForbiddenAction, rules.enforce,
self.credentials, action, self.target)
def test_enforce_good_action(self):
@@ -105,7 +112,7 @@ class PolicyTestCase(test.TestCase):
action = "example:get_http"
target = {}
result = rules.enforce(self.credentials, action, target)
- self.assertEqual(result, None)
+ self.assertTrue(result)
def test_enforce_http_false(self):
@@ -114,7 +121,7 @@ class PolicyTestCase(test.TestCase):
self.stubs.Set(urllib2, 'urlopen', fakeurlopen)
action = "example:get_http"
target = {}
- self.assertRaises(exception.Forbidden, rules.enforce,
+ self.assertRaises(exception.ForbiddenAction, rules.enforce,
self.credentials, action, target)
def test_templatized_enforcement(self):
@@ -123,12 +130,12 @@ class PolicyTestCase(test.TestCase):
credentials = {'project_id': 'fake', 'roles': []}
action = "example:my_file"
rules.enforce(credentials, action, target_mine)
- self.assertRaises(exception.Forbidden, rules.enforce,
+ self.assertRaises(exception.ForbiddenAction, rules.enforce,
credentials, action, target_not_mine)
def test_early_AND_enforcement(self):
action = "example:early_and_fail"
- self.assertRaises(exception.Forbidden, rules.enforce,
+ self.assertRaises(exception.ForbiddenAction, rules.enforce,
self.credentials, action, self.target)
def test_early_OR_enforcement(self):
@@ -151,30 +158,31 @@ class DefaultPolicyTestCase(test.TestCase):
rules.reset()
rules.init()
- self.brain = {
+ self.rules = {
"default": [],
"example:exist": [["false:false"]]
}
-
- self._set_brain('default')
+ self._set_rules('default')
self.credentials = {}
- def _set_brain(self, default_rule):
- brain = common_policy.HttpBrain(self.brain, default_rule)
- common_policy.set_brain(brain)
+ def _set_rules(self, default_rule):
+ these_rules = common_policy.Rules(
+ dict((k, common_policy.parse_rule(v))
+ for k, v in self.rules.items()), default_rule)
+ common_policy.set_rules(these_rules)
def tearDown(self):
super(DefaultPolicyTestCase, self).setUp()
rules.reset()
def test_policy_called(self):
- self.assertRaises(exception.Forbidden, rules.enforce,
+ self.assertRaises(exception.ForbiddenAction, rules.enforce,
self.credentials, "example:exist", {})
def test_not_found_policy_calls_default(self):
rules.enforce(self.credentials, "example:noexist", {})
def test_default_not_found(self):
- self._set_brain("default_noexist")
- self.assertRaises(exception.Forbidden, rules.enforce,
+ self._set_rules("default_noexist")
+ self.assertRaises(exception.ForbiddenAction, rules.enforce,
self.credentials, "example:noexist", {})
diff --git a/tests/test_sql_upgrade.py b/tests/test_sql_upgrade.py
index d204492c..85ea7580 100644
--- a/tests/test_sql_upgrade.py
+++ b/tests/test_sql_upgrade.py
@@ -56,9 +56,9 @@ class SqlUpgradeTests(test.TestCase):
self.config([test.etcdir('keystone.conf.sample'),
test.testsdir('test_overrides.conf'),
test.testsdir('backend_sql.conf')])
+ self.base = sql.Base()
# create and share a single sqlalchemy engine for testing
- self.base = sql.Base()
self.engine = self.base.get_engine(allow_global_engine=False)
self.Session = self.base.get_sessionmaker(engine=self.engine,
autocommit=False)
@@ -107,7 +107,7 @@ class SqlUpgradeTests(test.TestCase):
actual_cols = [col.name for col in table.columns]
self.assertEqual(expected_cols, actual_cols, '%s table' % table_name)
- def test_upgrade_0_to_1(self):
+ def test_upgrade_add_initial_tables(self):
self.upgrade(1)
self.assertTableColumns("user", ["id", "name", "extra"])
self.assertTableColumns("tenant", ["id", "name", "extra"])
@@ -117,7 +117,7 @@ class SqlUpgradeTests(test.TestCase):
self.assertTableColumns("metadata", ["user_id", "tenant_id", "data"])
self.populate_user_table()
- def test_upgrade_5_to_6(self):
+ def test_upgrade_add_policy(self):
self.upgrade(5)
self.assertTableDoesNotExist('policy')
@@ -125,7 +125,7 @@ class SqlUpgradeTests(test.TestCase):
self.assertTableExists('policy')
self.assertTableColumns('policy', ['id', 'type', 'blob', 'extra'])
- def test_upgrade_8_to_10(self):
+ def test_upgrade_normalize_identity(self):
self.upgrade(8)
self.populate_user_table()
self.populate_tenant_table()
@@ -179,7 +179,7 @@ class SqlUpgradeTests(test.TestCase):
session.commit()
session.close()
- def test_upgrade_10_to_13(self):
+ def test_upgrade_endpoints(self):
self.upgrade(10)
service_extra = {
'name': uuid.uuid4().hex,
@@ -266,7 +266,7 @@ class SqlUpgradeTests(test.TestCase):
self.downgrade(14)
self.assertTenantTables()
- def test_upgrade_13_to_14(self):
+ def test_upgrade_add_group_tables(self):
self.upgrade(13)
self.upgrade(14)
self.assertTableExists('group')
@@ -279,6 +279,7 @@ class SqlUpgradeTests(test.TestCase):
self.populate_user_table(with_pass_enab=True)
self.populate_tenant_table(with_desc_enab=True)
self.upgrade(16)
+
self.assertTableColumns("user",
["id", "name", "extra",
"password", "enabled", "domain_id"])
@@ -299,9 +300,12 @@ class SqlUpgradeTests(test.TestCase):
self.assertEqual(a_project.description,
default_fixtures.TENANTS[1]['description'])
self.assertEqual(a_project.domain_id, DEFAULT_DOMAIN_ID)
+
session.commit()
session.close()
+ self.check_uniqueness_constraints()
+
def test_downgrade_16_to_14(self):
self.upgrade(16)
self.populate_user_table(with_pass_enab_domain=True)
@@ -327,7 +331,7 @@ class SqlUpgradeTests(test.TestCase):
session.commit()
session.close()
- def test_downgrade_14_to_13(self):
+ def test_downgrade_remove_group_tables(self):
self.upgrade(14)
self.downgrade(13)
self.assertTableDoesNotExist('group')
@@ -335,7 +339,7 @@ class SqlUpgradeTests(test.TestCase):
self.assertTableDoesNotExist('group_domain_metadata')
self.assertTableDoesNotExist('user_group_membership')
- def test_downgrade_13_to_10(self):
+ def test_downgrade_endpoints(self):
self.upgrade(13)
service_extra = {
@@ -420,7 +424,7 @@ class SqlUpgradeTests(test.TestCase):
"metadata"]:
self.assertTableDoesNotExist(table_name)
- def test_upgrade_6_to_7(self):
+ def test_upgrade_add_domain_tables(self):
self.upgrade(6)
self.assertTableDoesNotExist('credential')
self.assertTableDoesNotExist('domain')
@@ -436,6 +440,92 @@ class SqlUpgradeTests(test.TestCase):
self.assertTableColumns('user_domain_metadata',
['user_id', 'domain_id', 'data'])
+ def test_upgrade_default_roles(self):
+ def count_member_roles():
+ session = self.Session()
+ query_string = ("select count(*) as c from role "
+ "where name='%s'" % config.CONF.member_role_name)
+ role_count = session.execute(query_string).fetchone()['c']
+ session.close()
+ return role_count
+
+ self.upgrade(16)
+ self.assertEquals(0, count_member_roles())
+ self.upgrade(17)
+ self.assertEquals(1, count_member_roles())
+ self.downgrade(16)
+ self.assertEquals(0, count_member_roles())
+
+ def check_uniqueness_constraints(self):
+ # Check uniqueness constraints for User & Project tables are
+ # correct following schema modification. The Group table's
+ # schema is never modified, so we don't bother to check that.
+ domain_table = sqlalchemy.Table('domain',
+ self.metadata,
+ autoload=True)
+ domain1 = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'enabled': True}
+ domain2 = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'enabled': True}
+ cmd = domain_table.insert().values(domain1)
+ self.engine.execute(cmd)
+ cmd = domain_table.insert().values(domain2)
+ self.engine.execute(cmd)
+
+ # First, the User table.
+ this_table = sqlalchemy.Table('user',
+ self.metadata,
+ autoload=True)
+ user = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain1['id'],
+ 'password': uuid.uuid4().hex,
+ 'enabled': True,
+ 'extra': json.dumps({})}
+ cmd = this_table.insert().values(user)
+ self.engine.execute(cmd)
+ # now insert a user with the same name into a different
+ # domain - which should work.
+ user['id'] = uuid.uuid4().hex
+ user['domain_id'] = domain2['id']
+ cmd = this_table.insert().values(user)
+ self.engine.execute(cmd)
+ # TODO(henry-nash). For now, as part of clean-up we
+ # delete one of these users. Although not part of this test,
+ # unless we do so the downgrade(16->15) that is part of
+ # teardown with fail due to having two uses with clashing
+ # name as we try to revert to a single global name space. This
+ # limitation is raised as Bug #1125046 and the delete
+ # could be removed depending on how that bug is resolved.
+ cmd = this_table.delete(id=user['id'])
+ self.engine.execute(cmd)
+
+ # Now, the Project table.
+ this_table = sqlalchemy.Table('project',
+ self.metadata,
+ autoload=True)
+ project = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain1['id'],
+ 'description': uuid.uuid4().hex,
+ 'enabled': True,
+ 'extra': json.dumps({})}
+ cmd = this_table.insert().values(project)
+ self.engine.execute(cmd)
+ # now insert a project with the same name into a different
+ # domain - which should work.
+ project['id'] = uuid.uuid4().hex
+ project['domain_id'] = domain2['id']
+ cmd = this_table.insert().values(project)
+ self.engine.execute(cmd)
+ # TODO(henry-nash) For now, we delete one of the projects for
+ # the same reason as we delete one of the users (Bug #1125046).
+ # This delete could be removed depending on that bug resolution.
+ cmd = this_table.delete(id=project['id'])
+ self.engine.execute(cmd)
+
def populate_user_table(self, with_pass_enab=False,
with_pass_enab_domain=False):
# Populate the appropriate fields in the user
diff --git a/tests/test_url_middleware.py b/tests/test_url_middleware.py
index 19f6141c..d5c2eef1 100644
--- a/tests/test_url_middleware.py
+++ b/tests/test_url_middleware.py
@@ -19,8 +19,6 @@ import webob
from keystone import middleware
from keystone import test
-import test_auth_token_middleware as test_atm
-
class FakeApp(object):
"""Fakes a WSGI app URL normalized."""
@@ -33,7 +31,6 @@ class FakeApp(object):
class UrlMiddlewareTest(test.TestCase):
def setUp(self):
self.middleware = middleware.NormalizingFilter(FakeApp())
- self.middleware.http_client_class = test_atm.FakeHTTPConnection
self.response_status = None
self.response_headers = None
super(UrlMiddlewareTest, self).setUp()
diff --git a/tests/test_v3.py b/tests/test_v3.py
index 28475e14..f5602035 100644
--- a/tests/test_v3.py
+++ b/tests/test_v3.py
@@ -185,7 +185,7 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
keys = ['name', 'description', 'enabled']
for k in ['id'] + keys:
- msg = '%s unnexpectedly None in %s' % (k, entity)
+ msg = '%s unexpectedly None in %s' % (k, entity)
self.assertIsNotNone(entity.get(k), msg)
self.assertIsNotNone(entity.get('links'))