summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAdam Young <ayoung@redhat.com>2013-03-06 10:14:17 -0500
committerAdam Young <ayoung@redhat.com>2013-03-08 13:12:05 -0500
commita2c3636bfdebc3af3738e87fc2295dc3845913d2 (patch)
tree0e32c0cf5c6f4591c52ffe080aae400f607f0ff9
parentdd7d4fd551f727873462d3ef2ece863ab6400995 (diff)
Delete tokens for user
Bug 1152283 There was a typo in the function that showed it was untested. This fixes the typo and adds a unit test It also corrects the logic in the KVS backend to ignore the user_id field when listing tokens by trust_id Change-Id: I5325c04e53a09fce68f3d350e7502341a398aa05
-rw-r--r--keystone/identity/controllers.py22
-rw-r--r--keystone/token/backends/kvs.py60
-rw-r--r--keystone/token/backends/sql.py55
-rw-r--r--tests/test_auth.py18
4 files changed, 104 insertions, 51 deletions
diff --git a/keystone/identity/controllers.py b/keystone/identity/controllers.py
index e8472987..8a2e8dbe 100644
--- a/keystone/identity/controllers.py
+++ b/keystone/identity/controllers.py
@@ -161,22 +161,22 @@ class Tenant(controller.V2Controller):
return o
-def delete_tokens_for_user(token_api, trust_api, context, user_id, user):
+def delete_tokens_for_user(context, token_api, trust_api, user_id):
try:
#First delete tokens that could get other tokens.
for token_id in token_api.list_tokens(context, user_id):
token_api.delete_token(context, token_id)
#now delete trust tokens
- for trust_id in (trust_api.list_trusts_for_trustee(context, user_id)):
- token_list = token_api.list_tokens(context, userid,
- trust_id=trust_id)
+ for trust in trust_api.list_trusts_for_trustee(context, user_id):
+ token_list = token_api.list_tokens(context, user_id,
+ trust_id=trust['id'])
for token in token_list:
token_api.delete_token(context, token)
except exception.NotImplemented:
# The users status has been changed but tokens remain valid for
# backends that can't list tokens for users
- LOG.warning('User %s status has changed, but existing tokens '
- 'remain valid' % user_id)
+ LOG.warning(_('User %s status has changed, but existing tokens '
+ 'remain valid') % user_id)
class User(controller.V2Controller):
@@ -235,10 +235,8 @@ class User(controller.V2Controller):
if user.get('password') or not user.get('enabled', True):
# If the password was changed or the user was disabled we clear tokens
- delete_tokens_for_user(self.token_api, self.trust_api,
- context,
- user_id,
- user)
+ delete_tokens_for_user(context, self.token_api, self.trust_api,
+ user_id)
return {'user': self._filter_domain_id(user_ref)}
def delete_user(self, context, user_id):
@@ -343,8 +341,8 @@ class Role(controller.V2Controller):
context, user_id, tenant_id, role_id)
roles = self.identity_api.get_roles_for_user_and_project(
context, user_id, tenant_id)
- delete_tokens_for_user(self.token_api, self.trust_api, context,
- user_id, tenant_id)
+ delete_tokens_for_user(
+ context, self.token_api, self.trust_api, user_id)
# COMPAT(diablo): CRUD extension
def get_role_refs(self, context, user_id):
diff --git a/keystone/token/backends/kvs.py b/keystone/token/backends/kvs.py
index 0adf0579..da45b37f 100644
--- a/keystone/token/backends/kvs.py
+++ b/keystone/token/backends/kvs.py
@@ -46,7 +46,7 @@ class Token(kvs.Base, token.Driver):
if not data_copy.get('expires'):
data_copy['expires'] = token.default_expire_time()
if 'trust_id' in data and data['trust_id'] is None:
- data_copy.pop('trust_id')
+ data_copy.pop('trust_id')
self.db.set('token-%s' % token_id, data_copy)
return copy.deepcopy(data_copy)
@@ -59,32 +59,52 @@ class Token(kvs.Base, token.Driver):
except exception.NotFound:
raise exception.TokenNotFound(token_id=token_id)
- def list_tokens(self, user_id, tenant_id=None, trust_id=None):
+ def is_not_expired(self, now, ref):
+ return not ref.get('expires') and ref.get('expires') < now
+
+ def is_expired(self, now, ref):
+ return ref.get('expires') and ref.get('expires') < now
+
+ def trust_matches(self, trust_id, ref):
+ return ref.get('trust_id') and ref['trust_id'] == trust_id
+
+ def _list_tokens_for_trust(self, trust_id):
tokens = []
now = timeutils.utcnow()
for token, ref in self.db.items():
- if not token.startswith('token-'):
+ if not token.startswith('token-') or self.is_expired(now, ref):
continue
- user = ref.get('user')
- if not user:
- continue
- if user.get('id') != user_id:
- continue
- if ref.get('expires') and ref.get('expires') < now:
+ ref_trust_id = ref.get('trust_id')
+ if self.trust_matches(trust_id, ref):
+ tokens.append(token.split('-', 1)[1])
+ return tokens
+
+ def _list_tokens_for_user(self, user_id, tenant_id=None):
+ def user_matches(user_id, ref):
+ return ref.get('user') and ref['user'].get('id') == user_id
+
+ def tenant_matches(tenant_id, ref):
+ return ((tenant_id is None) or
+ (ref.get('tenant') and
+ ref['tenant'].get('id') == tenant_id))
+
+ tokens = []
+ now = timeutils.utcnow()
+ for token, ref in self.db.items():
+ if not token.startswith('token-') or self.is_expired(now, ref):
continue
- if tenant_id is not None:
- tenant = ref.get('tenant')
- if not tenant:
- continue
- if tenant.get('id') != tenant_id:
- continue
- if trust_id is not None:
- trust = ref.get('trust_id')
- if not trust:
- continue
- tokens.append(token.split('-', 1)[1])
+ else:
+ if (user_matches(user_id, ref) and
+ tenant_matches(tenant_id, ref)):
+ tokens.append(token.split('-', 1)[1])
return tokens
+ def list_tokens(self, user_id, tenant_id=None, trust_id=None):
+ if trust_id:
+ return self._list_tokens_for_trust(trust_id)
+ else:
+ return self._list_tokens_for_user(user_id, tenant_id)
+
def list_revoked_tokens(self):
tokens = []
for token, token_ref in self.db.items():
diff --git a/keystone/token/backends/sql.py b/keystone/token/backends/sql.py
index 62122ebe..561bb027 100644
--- a/keystone/token/backends/sql.py
+++ b/keystone/token/backends/sql.py
@@ -75,7 +75,33 @@ class Token(sql.Base, token.Driver):
token_ref.valid = False
session.flush()
- def list_tokens(self, user_id, tenant_id=None, trust_id=None):
+ def _list_tokens_for_trust(self, trust_id):
+ def trust_matches(trust_id, token_ref_dict):
+ return (token_ref_dict.get('trust_id') and
+ token_ref_dict['trust_id'] == trust_id)
+
+ session = self.get_session()
+ tokens = []
+ now = timeutils.utcnow()
+ query = session.query(TokenModel)
+ query = query.filter(TokenModel.expires > now)
+ token_references = query.filter_by(valid=True)
+ for token_ref in token_references:
+ token_ref_dict = token_ref.to_dict()
+ if trust_matches(trust_id, token_ref_dict):
+ tokens.append(token_ref['id'])
+ return tokens
+
+ def _list_tokens_for_user(self, user_id, tenant_id=None):
+ def user_matches(user_id, token_ref_dict):
+ return (token_ref_dict.get('user') and
+ token_ref_dict['user'].get('id') == user_id)
+
+ def tenant_matches(tenant_id, token_ref_dict):
+ return ((tenant_id is None) or
+ (token_ref_dict.get('tenant') and
+ token_ref_dict['tenant'].get('id') == tenant_id))
+
session = self.get_session()
tokens = []
now = timeutils.utcnow()
@@ -84,26 +110,17 @@ class Token(sql.Base, token.Driver):
token_references = query.filter_by(valid=True)
for token_ref in token_references:
token_ref_dict = token_ref.to_dict()
- user = token_ref_dict.get('user')
- if not user:
- continue
- if user.get('id') != user_id:
- continue
- if tenant_id is not None:
- tenant = token_ref_dict.get('tenant')
- if not tenant:
- continue
- if tenant.get('id') != tenant_id:
- continue
- if trust_id is not None:
- token_trust_id = token_ref_dict.get('trust_id')
- if not token_trust_id:
- continue
- if token_trust_id != trust_id:
- continue
- tokens.append(token_ref['id'])
+ if (user_matches(user_id, token_ref_dict) and
+ tenant_matches(tenant_id, token_ref_dict)):
+ tokens.append(token_ref['id'])
return tokens
+ def list_tokens(self, user_id, tenant_id=None, trust_id=None):
+ if trust_id:
+ return self._list_tokens_for_trust(trust_id)
+ else:
+ return self._list_tokens_for_user(user_id, tenant_id)
+
def list_revoked_tokens(self):
session = self.get_session()
tokens = []
diff --git a/tests/test_auth.py b/tests/test_auth.py
index 8c8feba0..1436d8ec 100644
--- a/tests/test_auth.py
+++ b/tests/test_auth.py
@@ -20,6 +20,7 @@ import uuid
from keystone import auth
from keystone import config
from keystone import exception
+from keystone import identity
from keystone.openstack.common import timeutils
from keystone import test
from keystone import token
@@ -639,6 +640,23 @@ class AuthWithTrust(AuthTest):
"user_foo has three roles, but the token should"
" only get the two roles specified in the trust.")
+ def assert_token_count_for_trust(self, expected_value):
+ tokens = self.trust_controller.token_api.list_tokens(
+ {}, self.trustee['id'], trust_id=self.new_trust['id'])
+ token_count = len(tokens)
+ self.assertEquals(token_count, expected_value)
+
+ def test_delete_tokens_for_user_invalidates_tokens_from_trust(self):
+ self.assert_token_count_for_trust(0)
+ auth_response = self.fetch_v2_token_from_trust()
+ self.assert_token_count_for_trust(1)
+ identity.controllers.delete_tokens_for_user(
+ {},
+ self.trust_controller.token_api,
+ self.trust_controller.trust_api,
+ self.trustee['id'])
+ self.assert_token_count_for_trust(0)
+
def test_token_from_trust_cant_get_another_token(self):
auth_response = self.fetch_v2_token_from_trust()
trust_token_id = auth_response['access']['token']['id']