summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlvaro Lopez Garcia <aloga@ifca.unican.es>2012-11-05 10:56:40 +0100
committerAlvaro Lopez Garcia <aloga@ifca.unican.es>2012-11-15 10:04:47 +0100
commitbe754ff2bc05a0d262469edd7ce8fac19d457231 (patch)
treecf33aa941544261f8d9206c0ea088ef6fd036777
parent2b83d4da7aca79c48f2789930bd5790bf993a606 (diff)
downloadkeystone-be754ff2bc05a0d262469edd7ce8fac19d457231.tar.gz
keystone-be754ff2bc05a0d262469edd7ce8fac19d457231.tar.xz
keystone-be754ff2bc05a0d262469edd7ce8fac19d457231.zip
Refactor TokenController.authenticate() method.
Change-Id: I29710f749c67cf83ccad12deee54fe6b71dd53b8
-rw-r--r--keystone/service.py386
-rw-r--r--tests/test_keystoneclient.py8
-rw-r--r--tests/test_service.py280
3 files changed, 444 insertions, 230 deletions
diff --git a/keystone/service.py b/keystone/service.py
index f31229bf..21d9303c 100644
--- a/keystone/service.py
+++ b/keystone/service.py
@@ -414,6 +414,10 @@ class NoopController(wsgi.Application):
return {}
+class ExternalAuthNotApplicable(Exception):
+ """External authentication is not applicable"""
+
+
class TokenController(wsgi.Application):
def __init__(self):
self.catalog_api = catalog.Manager()
@@ -455,182 +459,48 @@ class TokenController(wsgi.Application):
Alternatively, this call accepts auth with only a token and tenant
that will return a token that is scoped to that tenant.
"""
- if not auth:
- raise exception.ValidationError(attribute='auth',
- target='request body')
if auth is None:
- auth = {}
- remote_auth = False
- if 'REMOTE_USER' in context and not 'token' in auth:
- # authenticated external request
- remote_auth = True
-
- if 'passwordCredentials' not in auth:
- auth['passwordCredentials'] = {}
- auth['passwordCredentials']['username'] = context.get(
- 'REMOTE_USER', None)
-
- if 'passwordCredentials' in auth:
- user_id = auth['passwordCredentials'].get('userId', None)
- username = auth['passwordCredentials'].get('username', '')
- password = auth['passwordCredentials'].get('password', '')
- tenant_name = auth.get('tenantName', None)
-
- if not user_id and not username:
- raise exception.ValidationError(
- attribute='username or userId',
- target='passwordCredentials')
-
- tenant_ref = None
- user_ref = None
- metadata_ref = {}
-
- if username:
- try:
- user_ref = self.identity_api.get_user_by_name(
- context=context, user_name=username)
- user_id = user_ref['id']
- except exception.UserNotFound:
- raise exception.Unauthorized()
-
- if not password and not remote_auth:
- raise exception.ValidationError(
- attribute='password',
- target='passwordCredentials')
-
- # more compat
- tenant_id = auth.get('tenantId', None)
- if tenant_name:
- try:
- tenant_ref = self.identity_api.get_tenant_by_name(
- context=context, tenant_name=tenant_name)
- tenant_id = tenant_ref['id']
- except exception.TenantNotFound:
- raise exception.Unauthorized()
-
- try:
- if not remote_auth:
- # local identity authentication required
- auth_info = self.identity_api.authenticate(
- context=context,
- user_id=user_id,
- password=password,
- tenant_id=tenant_id)
- (user_ref, tenant_ref, metadata_ref) = auth_info
- else:
- # remote authentication already performed
- if not user_ref:
- user_ref = self.identity_api.get_user(
- self.identity_api,
- user_id)
- if tenant_id:
- if not tenant_ref:
- tenant_ref = self.identity_api.get_tenant(
- self.identity_api,
- tenant_id)
- metadata_ref = self.identity_api.get_metadata(
- self.identity_api,
- user_id,
- tenant_id)
- auth_info = (user_ref, tenant_ref, metadata_ref)
-
- # If the user is disabled don't allow them to authenticate
- if not user_ref.get('enabled', True):
- LOG.warning('User %s is disabled' % user_id)
- raise exception.Unauthorized()
-
- # If the tenant is disabled don't allow them to authenticate
- if tenant_ref and not tenant_ref.get('enabled', True):
- LOG.warning('Tenant %s is disabled' % tenant_id)
- raise exception.Unauthorized()
- except AssertionError as e:
- raise exception.Unauthorized(str(e))
- auth_token_data = dict(zip(['user', 'tenant', 'metadata'],
- auth_info))
- expiry = self.token_api._get_default_expire_time(context=context)
-
- if tenant_ref:
- catalog_ref = self.catalog_api.get_catalog(
- context=context,
- user_id=user_ref['id'],
- tenant_id=tenant_ref['id'],
- metadata=metadata_ref)
- else:
- catalog_ref = {}
- elif 'token' in auth:
- old_token = auth['token'].get('id', None)
- tenant_name = auth.get('tenantName')
-
- try:
- old_token_ref = self.token_api.get_token(context=context,
- token_id=old_token)
- except exception.NotFound:
- LOG.warning("Token not found: " + str(old_token))
- raise exception.Unauthorized()
-
- user_ref = old_token_ref['user']
- user_id = user_ref['id']
-
- current_user_ref = self.identity_api.get_user(context=context,
- user_id=user_id)
-
- # If the user is disabled don't allow them to authenticate
- if not current_user_ref.get('enabled', True):
- LOG.warning('User %s is disabled' % user_id)
- raise exception.Unauthorized()
-
- if tenant_name:
- tenant_ref = self.identity_api.get_tenant_by_name(
- context=context,
- tenant_name=tenant_name)
- tenant_id = tenant_ref['id']
- else:
- tenant_id = auth.get('tenantId', None)
- tenants = self.identity_api.get_tenants_for_user(context, user_id)
+ raise exception.ValidationError(attribute='auth',
+ target='request body')
- if tenant_id:
- if not tenant_id in tenants:
- LOG.warning('User %s is unauthorized for tenant %s'
- % (user_id, tenant_id))
- raise exception.Unauthorized()
+ auth_token_data = None
- expiry = old_token_ref['expires']
+ if "token" in auth:
+ # Try to authenticate using a token
+ auth_token_data, auth_info = self._authenticate_token(
+ context, auth)
+ else:
+ # Try external authentication
try:
- tenant_ref = self.identity_api.get_tenant(context=context,
- tenant_id=tenant_id)
- except exception.TenantNotFound:
- tenant_ref = None
- metadata_ref = {}
- catalog_ref = {}
- except exception.MetadataNotFound:
- metadata_ref = {}
- catalog_ref = {}
-
- # If the tenant is disabled don't allow them to authenticate
- if tenant_ref and not tenant_ref.get('enabled', True):
- LOG.warning('Tenant %s is disabled' % tenant_id)
- raise exception.Unauthorized()
-
- if tenant_ref:
- metadata_ref = self.identity_api.get_metadata(
- context=context,
- user_id=user_ref['id'],
- tenant_id=tenant_ref['id'])
- catalog_ref = self.catalog_api.get_catalog(
- context=context,
- user_id=user_ref['id'],
- tenant_id=tenant_ref['id'],
- metadata=metadata_ref)
-
- auth_token_data = dict(dict(user=current_user_ref,
- tenant=tenant_ref,
- metadata=metadata_ref))
+ auth_token_data, auth_info = self._authenticate_external(
+ context, auth)
+ except ExternalAuthNotApplicable:
+ # Try local authentication
+ auth_token_data, auth_info = self._authenticate_local(
+ context, auth)
+
+ user_ref, tenant_ref, metadata_ref = auth_info
+
+ # If the user is disabled don't allow them to authenticate
+ if not user_ref.get('enabled', True):
+ LOG.warning('User %s is disabled' % user_ref["id"])
+ raise exception.Unauthorized()
+
+ # If the tenant is disabled don't allow them to authenticate
+ if tenant_ref and not tenant_ref.get('enabled', True):
+ LOG.warning('Tenant %s is disabled' % tenant_ref["id"])
+ raise exception.Unauthorized()
+
+ if tenant_ref:
+ catalog_ref = self.catalog_api.get_catalog(
+ context=context,
+ user_id=user_ref['id'],
+ tenant_id=tenant_ref['id'],
+ metadata=metadata_ref)
else:
- raise exception.ValidationError(
- attribute='passwordCredentials or token', target='auth')
+ catalog_ref = {}
- auth_token_data['expires'] = expiry
auth_token_data['id'] = 'placeholder'
roles_ref = []
@@ -646,7 +516,6 @@ class TokenController(wsgi.Application):
if config.CONF.signing.token_format == 'UUID':
token_id = uuid.uuid4().hex
elif config.CONF.signing.token_format == 'PKI':
-
token_id = cms.cms_sign_token(json.dumps(token_data),
config.CONF.signing.certfile,
config.CONF.signing.keyfile)
@@ -675,6 +544,183 @@ class TokenController(wsgi.Application):
return token_data
+ def _authenticate_token(self, context, auth):
+ """Try to authenticate using an already existing token.
+
+ Returns auth_token_data, (user_ref, tenant_ref, metadata_ref)
+ """
+ if 'token' not in auth:
+ raise exception.ValidationError(
+ attribute='token', target='auth')
+
+ if "id" not in auth['token']:
+ raise exception.ValidationError(
+ attribute="id", target="token")
+
+ old_token = auth['token']['id']
+
+ try:
+ old_token_ref = self.token_api.get_token(context=context,
+ token_id=old_token)
+ except exception.NotFound:
+ LOG.warning("Token not found: " + str(old_token))
+ raise exception.Unauthorized()
+
+ user_ref = old_token_ref['user']
+ user_id = user_ref['id']
+
+ current_user_ref = self.identity_api.get_user(context=context,
+ user_id=user_id)
+
+ tenant_id = self._get_tenant_id_from_auth(context, auth)
+
+ tenant_ref = self._get_tenant_ref(context, user_id, tenant_id)
+ metadata_ref = self._get_metadata_ref(context, user_id, tenant_id)
+
+ expiry = old_token_ref['expires']
+ auth_token_data = self._get_auth_token_data(current_user_ref,
+ tenant_ref,
+ metadata_ref,
+ expiry)
+
+ return auth_token_data, (current_user_ref, tenant_ref, metadata_ref)
+
+ def _authenticate_local(self, context, auth):
+ """Try to authenticate against the identity backend.
+
+ Returns auth_token_data, (user_ref, tenant_ref, metadata_ref)
+ """
+ if 'passwordCredentials' not in auth:
+ raise exception.ValidationError(
+ attribute='passwordCredentials', target='auth')
+
+ if "password" not in auth['passwordCredentials']:
+ raise exception.ValidationError(
+ attribute='password', target='passwordCredentials')
+
+ password = auth['passwordCredentials']['password']
+
+ if ("userId" not in auth['passwordCredentials'] and
+ "username" not in auth['passwordCredentials']):
+ raise exception.ValidationError(
+ attribute='username or userId',
+ target='passwordCredentials')
+
+ user_id = auth['passwordCredentials'].get('userId', None)
+ username = auth['passwordCredentials'].get('username', '')
+
+ if username:
+ try:
+ user_ref = self.identity_api.get_user_by_name(
+ context=context, user_name=username)
+ user_id = user_ref['id']
+ except exception.UserNotFound:
+ LOG.warn("User not found: %s" % user_id)
+ raise exception.Unauthorized()
+
+ tenant_id = self._get_tenant_id_from_auth(context, auth)
+
+ try:
+ auth_info = self.identity_api.authenticate(
+ context=context,
+ user_id=user_id,
+ password=password,
+ tenant_id=tenant_id)
+ except AssertionError as e:
+ raise exception.Unauthorized(str(e))
+ (user_ref, tenant_ref, metadata_ref) = auth_info
+
+ expiry = self.token_api._get_default_expire_time(context=context)
+ auth_token_data = self._get_auth_token_data(user_ref,
+ tenant_ref,
+ metadata_ref,
+ expiry)
+
+ return auth_token_data, (user_ref, tenant_ref, metadata_ref)
+
+ def _authenticate_external(self, context, auth):
+ """Try to authenticate an external user via REMOTE_USER variable.
+
+ Returns auth_token_data, (user_ref, tenant_ref, metadata_ref)
+ """
+ if 'REMOTE_USER' not in context:
+ raise ExternalAuthNotApplicable()
+
+ username = context['REMOTE_USER']
+ try:
+ user_ref = self.identity_api.get_user_by_name(
+ context=context, user_name=username)
+ user_id = user_ref['id']
+ except exception.UserNotFound:
+ LOG.warn("User not found: %s" % username)
+ raise exception.Unauthorized()
+
+ tenant_id = self._get_tenant_id_from_auth(context, auth)
+
+ tenant_ref = self._get_tenant_ref(context, user_id, tenant_id)
+ metadata_ref = self._get_metadata_ref(context, user_id, tenant_id)
+
+ expiry = self.token_api._get_default_expire_time(context=context)
+ auth_token_data = self._get_auth_token_data(user_ref,
+ tenant_ref,
+ metadata_ref,
+ expiry)
+
+ return auth_token_data, (user_ref, tenant_ref, metadata_ref)
+
+ def _get_auth_token_data(self, user, tenant, metadata, expiry):
+ return dict(dict(user=user,
+ tenant=tenant,
+ metadata=metadata,
+ expires=expiry))
+
+ def _get_tenant_id_from_auth(self, context, auth):
+ """Extract tenant information from auth dict.
+
+ Returns a valid tenant_id if it exists, or None if not specified.
+ """
+ tenant_id = auth.get('tenantId', None)
+ tenant_name = auth.get('tenantName', None)
+ if tenant_name:
+ try:
+ tenant_ref = self.identity_api.get_tenant_by_name(
+ context=context, tenant_name=tenant_name)
+ tenant_id = tenant_ref['id']
+ except exception.TenantNotFound:
+ raise exception.Unauthorized()
+ return tenant_id
+
+ def _get_tenant_ref(self, context, user_id, tenant_id):
+ """Returns the tenant_ref for the user's tenant"""
+ tenant_ref = None
+ if tenant_id:
+ tenants = self.identity_api.get_tenants_for_user(context, user_id)
+ if tenant_id not in tenants:
+ LOG.warning('User %s is unauthorized for tenant %s'
+ % (user_id, tenant_id))
+ raise exception.Unauthorized()
+
+ try:
+ tenant_ref = self.identity_api.get_tenant(context=context,
+ tenant_id=tenant_id)
+ except exception.TenantNotFound:
+ exception.Unauthorized()
+ return tenant_ref
+
+ def _get_metadata_ref(self, context, user_id, tenant_id):
+ """Returns the metadata_ref for a user in a tenant"""
+ metadata_ref = {}
+ if tenant_id:
+ try:
+ metadata_ref = self.identity_api.get_metadata(
+ context=context,
+ user_id=user_id,
+ tenant_id=tenant_id)
+ except exception.MetadataNotFound:
+ metadata_ref = {}
+
+ return metadata_ref
+
def _get_token_ref(self, context, token_id, belongs_to=None):
"""Returns a token if a valid one exists.
diff --git a/tests/test_keystoneclient.py b/tests/test_keystoneclient.py
index 9c2c41e9..1eb5be44 100644
--- a/tests/test_keystoneclient.py
+++ b/tests/test_keystoneclient.py
@@ -141,6 +141,14 @@ class KeystoneClientTests(object):
self._client, token=token,
tenant_id=uuid.uuid4().hex)
+ def test_authenticate_token_invalid_tenant_name(self):
+ from keystoneclient import exceptions as client_exceptions
+ client = self.get_client()
+ token = client.auth_token
+ self.assertRaises(client_exceptions.Unauthorized,
+ self._client, token=token,
+ tenant_name=uuid.uuid4().hex)
+
def test_authenticate_token_tenant_name(self):
client = self.get_client()
token = client.auth_token
diff --git a/tests/test_service.py b/tests/test_service.py
index ef5c1f43..ce277c5c 100644
--- a/tests/test_service.py
+++ b/tests/test_service.py
@@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
+import uuid
+
import default_fixtures
from keystone import exception
@@ -22,37 +24,77 @@ from keystone.identity.backends import kvs as kvs_identity
from keystone.openstack.common import timeutils
-class FakeIdentityManager(object):
- def get_user_by_name(self, context, user_name):
- return {'id': 1, 'name': 'test', 'extra': ''}
+def _build_user_auth(token=None, username=None,
+ password=None, tenant_name=None):
+ """Build auth dictionary.
+
+ It will create an auth dictionary based on all the arguments
+ that it receives.
+ """
+ auth_json = {}
+ if token is not None:
+ auth_json['token'] = token
+ if username or password:
+ auth_json['passwordCredentials'] = {}
+ if username is not None:
+ auth_json['passwordCredentials']['username'] = username
+ if password is not None:
+ auth_json['passwordCredentials']['password'] = password
+ if tenant_name is not None:
+ auth_json['tenantName'] = tenant_name
+ return auth_json
class TokenControllerTest(test.TestCase):
def setUp(self):
super(TokenControllerTest, self).setUp()
- self.stubs.Set(identity, 'Manager', FakeIdentityManager)
+ self.identity_api = kvs_identity.Identity()
+ self.load_fixtures(default_fixtures)
self.api = service.TokenController()
- def test_authenticate_blank_password_credentials(self):
- """Verify sending empty json dict as passwordCredentials raises the
- right exception."""
- body_dict = {'passwordCredentials': {}, 'tenantName': 'demo'}
- self.assertRaises(exception.ValidationError, self.api.authenticate,
- {}, body_dict)
+ def assertEqualTokens(self, a, b):
+ """Assert that two tokens are equal.
- def test_authenticate_no_username(self):
- """Verify skipping username raises the right exception."""
- body_dict = {'passwordCredentials': {'password': 'pass'},
- 'tenantName': 'demo'}
- self.assertRaises(exception.ValidationError, self.api.authenticate,
- {}, body_dict)
+ Compare two tokens except for their ids. This also truncates
+ the time in the comparison.
+ """
+ def normalize(token):
+ token['access']['token']['id'] = 'dummy'
+ # truncate to eliminate timing problems
+ issued = token['access']['token']['issued_at']
+ token['access']['token']['issued_at'] = issued[:-8]
+ # truncate to eliminate timing problems
+ expires = token['access']['token']['expires']
+ token['access']['token']['expires'] = expires[:-3]
+ return token
+ return self.assertDictEqual(normalize(a), normalize(b))
- def test_authenticate_no_password(self):
- """Verify skipping password raises the right exception."""
- body_dict = {'passwordCredentials': {'username': 'user1'},
- 'tenantName': 'demo'}
- self.assertRaises(exception.ValidationError, self.api.authenticate,
- {}, body_dict)
+
+class AuthBadRequests(TokenControllerTest):
+ def setUp(self):
+ super(AuthBadRequests, self).setUp()
+
+ def test_no_external_auth(self):
+ """Verify that _authenticate_external() raises exception if
+ not applicable"""
+ self.assertRaises(
+ service.ExternalAuthNotApplicable,
+ self.api._authenticate_external,
+ {}, {})
+
+ def test_no_token_in_auth(self):
+ """Verity that _authenticate_token() raises exception if no token"""
+ self.assertRaises(
+ exception.ValidationError,
+ self.api._authenticate_token,
+ None, {})
+
+ def test_no_credentials_in_auth(self):
+ """Verity that _authenticate_local() raises exception if no creds"""
+ self.assertRaises(
+ exception.ValidationError,
+ self.api._authenticate_local,
+ None, {})
def test_authenticate_blank_request_body(self):
"""Verify sending empty json dict raises the right exception."""
@@ -61,8 +103,9 @@ class TokenControllerTest(test.TestCase):
def test_authenticate_blank_auth(self):
"""Verify sending blank 'auth' raises the right exception."""
+ body_dict = _build_user_auth()
self.assertRaises(exception.ValidationError, self.api.authenticate,
- {}, {'auth': {}})
+ {}, body_dict)
def test_authenticate_invalid_auth_content(self):
"""Verify sending invalid 'auth' raises the right exception."""
@@ -70,48 +113,141 @@ class TokenControllerTest(test.TestCase):
{}, {'auth': 'abcd'})
-class RemoteUserTest(test.TestCase):
+class AuthWithToken(TokenControllerTest):
def setUp(self):
- super(RemoteUserTest, self).setUp()
- self.identity_api = kvs_identity.Identity()
- self.load_fixtures(default_fixtures)
- self.api = service.TokenController()
+ super(AuthWithToken, self).setUp()
- def _build_user_auth(self, username, passwd, tenant):
- auth_json = {'passwordCredentials': {}}
- if username is not None:
- auth_json['passwordCredentials']['username'] = username
- if passwd is not None:
- auth_json['passwordCredentials']['password'] = passwd
- if tenant is not None:
- auth_json['tenantName'] = tenant
- return auth_json
+ def test_unscoped_token(self):
+ """Verify getting an unscoped token with password creds"""
+ body_dict = _build_user_auth(username='FOO',
+ password='foo2')
+ unscoped_token = self.api.authenticate({}, body_dict)
+ tenant = unscoped_token["access"]["token"].get("tenant", None)
+ self.assertEqual(tenant, None)
- def assertEqualTokens(self, a, b):
- def normalize(token):
- token['access']['token']['id'] = 'dummy'
- del token['access']['token']['expires']
- del token['access']['token']['issued_at']
- return token
+ def test_auth_invalid_token(self):
+ """Verify exception is raised if invalid token"""
+ body_dict = _build_user_auth(token={"id": uuid.uuid4().hex})
+ self.assertRaises(
+ exception.Unauthorized,
+ self.api.authenticate,
+ {}, body_dict)
- self.assertCloseEnoughForGovernmentWork(
- timeutils.parse_isotime(a['access']['token']['expires']),
- timeutils.parse_isotime(b['access']['token']['expires']))
- self.assertCloseEnoughForGovernmentWork(
- timeutils.parse_isotime(a['access']['token']['issued_at']),
- timeutils.parse_isotime(b['access']['token']['issued_at']))
- return self.assertDictEqual(normalize(a), normalize(b))
+ def test_auth_bad_formatted_token(self):
+ """Verify exception is raised if invalid token"""
+ body_dict = _build_user_auth(token={})
+ self.assertRaises(
+ exception.ValidationError,
+ self.api.authenticate,
+ {}, body_dict)
+
+ def test_auth_unscoped_token_no_tenant(self):
+ """Verify getting an unscoped token with an unscoped token"""
+ body_dict = _build_user_auth(
+ username='FOO',
+ password='foo2')
+ unscoped_token = self.api.authenticate({}, body_dict)
+
+ body_dict = _build_user_auth(
+ token=unscoped_token["access"]["token"])
+ unscoped_token_2 = self.api.authenticate({}, body_dict)
+
+ self.assertEqualTokens(unscoped_token, unscoped_token_2)
+
+ def test_auth_unscoped_token_tenant(self):
+ """Verify getting a token in a tenant with an unscoped token"""
+ # Get an unscoped tenant
+ body_dict = _build_user_auth(
+ username='FOO',
+ password='foo2')
+ unscoped_token = self.api.authenticate({}, body_dict)
+ # Get a token on BAR tenant using the unscoped tenant
+ body_dict = _build_user_auth(
+ token=unscoped_token["access"]["token"],
+ tenant_name="BAR")
+ scoped_token = self.api.authenticate({}, body_dict)
+
+ tenant = scoped_token["access"]["token"]["tenant"]
+ self.assertEquals(tenant["id"], self.tenant_bar['id'])
+
+
+class AuthWithPasswordCredentials(TokenControllerTest):
+ def setUp(self):
+ super(AuthWithPasswordCredentials, self).setUp()
+
+ def test_auth_invalid_user(self):
+ """Verify exception is raised if invalid user"""
+ body_dict = _build_user_auth(
+ username=uuid.uuid4().hex,
+ password=uuid.uuid4().hex)
+ self.assertRaises(
+ exception.Unauthorized,
+ self.api.authenticate,
+ {}, body_dict)
+
+ def test_auth_valid_user_invalid_password(self):
+ """Verify exception is raised if invalid password"""
+ body_dict = _build_user_auth(
+ username="FOO",
+ password=uuid.uuid4().hex)
+ self.assertRaises(
+ exception.Unauthorized,
+ self.api.authenticate,
+ {}, body_dict)
+
+ def test_auth_empty_password(self):
+ """Verify exception is raised if empty password"""
+ body_dict = _build_user_auth(
+ username="FOO",
+ password="")
+ self.assertRaises(
+ exception.Unauthorized,
+ self.api.authenticate,
+ {}, body_dict)
+
+ def test_auth_no_password(self):
+ """Verify exception is raised if empty password"""
+ body_dict = _build_user_auth(username="FOO")
+ self.assertRaises(
+ exception.ValidationError,
+ self.api.authenticate,
+ {}, body_dict)
+
+ def test_authenticate_blank_password_credentials(self):
+ """Verify sending empty json dict as passwordCredentials raises the
+ right exception."""
+ body_dict = {'passwordCredentials': {}, 'tenantName': 'demo'}
+ self.assertRaises(exception.ValidationError, self.api.authenticate,
+ {}, body_dict)
+
+ def test_authenticate_no_username(self):
+ """Verify skipping username raises the right exception."""
+ body_dict = _build_user_auth(password="pass",
+ tenant_name="demo")
+ self.assertRaises(exception.ValidationError, self.api.authenticate,
+ {}, body_dict)
+
+
+class AuthWithRemoteUser(TokenControllerTest):
+ def setUp(self):
+ super(AuthWithRemoteUser, self).setUp()
def test_unscoped_remote_authn(self):
+ """Verify getting an unscoped token with external authn"""
+ body_dict = _build_user_auth(
+ username='FOO',
+ password='foo2')
local_token = self.api.authenticate(
- {},
- self._build_user_auth('FOO', 'foo2', None))
+ {}, body_dict)
+
+ body_dict = _build_user_auth()
remote_token = self.api.authenticate(
- {'REMOTE_USER': 'FOO'},
- self._build_user_auth('FOO', 'nosir', None))
+ {'REMOTE_USER': 'FOO'}, body_dict)
+
self.assertEqualTokens(local_token, remote_token)
def test_unscoped_remote_authn_jsonless(self):
+ """Verify that external auth with invalid request fails"""
self.assertRaises(
exception.ValidationError,
self.api.authenticate,
@@ -119,17 +255,41 @@ class RemoteUserTest(test.TestCase):
None)
def test_scoped_remote_authn(self):
+ """Verify getting a token with external authn"""
+ body_dict = _build_user_auth(
+ username='FOO',
+ password='foo2',
+ tenant_name='BAR')
local_token = self.api.authenticate(
- {},
- self._build_user_auth('FOO', 'foo2', 'BAR'))
+ {}, body_dict)
+
+ body_dict = _build_user_auth(
+ tenant_name='BAR')
remote_token = self.api.authenticate(
- {'REMOTE_USER': 'FOO'},
- self._build_user_auth('FOO', 'nosir', 'BAR'))
+ {'REMOTE_USER': 'FOO'}, body_dict)
+
+ self.assertEqualTokens(local_token, remote_token)
+
+ def test_scoped_nometa_remote_authn(self):
+ """Verify getting a token with external authn and no metadata"""
+ body_dict = _build_user_auth(
+ username='TWO',
+ password='two2',
+ tenant_name='BAZ')
+ local_token = self.api.authenticate(
+ {}, body_dict)
+
+ body_dict = _build_user_auth(tenant_name='BAZ')
+ remote_token = self.api.authenticate(
+ {'REMOTE_USER': 'TWO'}, body_dict)
+
self.assertEqualTokens(local_token, remote_token)
def test_scoped_remote_authn_invalid_user(self):
+ """Verify that external auth with invalid user fails"""
+ body_dict = _build_user_auth(tenant_name="BAR")
self.assertRaises(
exception.Unauthorized,
self.api.authenticate,
- {'REMOTE_USER': 'FOOZBALL'},
- self._build_user_auth('FOO', 'nosir', 'BAR'))
+ {'REMOTE_USER': uuid.uuid4().hex},
+ body_dict)