summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--doc/source/controllingservers.rst4
-rw-r--r--doc/source/extensions.rst9
-rw-r--r--etc/keystone.conf14
-rw-r--r--etc/ldap.conf2
-rw-r--r--etc/memcache.conf2
-rw-r--r--etc/ssl.conf1
-rwxr-xr-xkeystone/backends/__init__.py4
-rw-r--r--keystone/backends/ldap/fakeldap.py6
-rw-r--r--keystone/contrib/extensions/admin/hpidm/__init__.py33
-rw-r--r--keystone/contrib/extensions/admin/hpidm/extension.json21
-rw-r--r--keystone/contrib/extensions/admin/hpidm/extension.xml21
-rw-r--r--keystone/controllers/token.py7
-rw-r--r--keystone/logic/extension_reader.py26
-rwxr-xr-xkeystone/logic/service.py121
-rw-r--r--keystone/manage/__init__.py2
-rw-r--r--keystone/manage/api.py10
-rw-r--r--keystone/middleware/auth_token.py9
-rwxr-xr-xkeystone/middleware/quantum_auth_token.py13
-rw-r--r--keystone/test/__init__.py12
-rw-r--r--keystone/test/client/test_extensions.py11
-rw-r--r--keystone/test/etc/ldap.conf.template2
-rw-r--r--keystone/test/etc/memcache.conf.template2
-rw-r--r--keystone/test/etc/sql.conf.template2
-rw-r--r--keystone/test/etc/sql_no_hpidm.conf.template54
-rw-r--r--keystone/test/etc/ssl.conf.template1
-rw-r--r--keystone/test/functional/common.py86
-rw-r--r--keystone/test/functional/test_extensions.py258
-rw-r--r--keystone/test/sampledata.py12
-rwxr-xr-xrun_tests.py1
-rwxr-xr-xrun_tests.sh1
30 files changed, 703 insertions, 44 deletions
diff --git a/doc/source/controllingservers.rst b/doc/source/controllingservers.rst
index da3ef38a..ba8bfc06 100644
--- a/doc/source/controllingservers.rst
+++ b/doc/source/controllingservers.rst
@@ -124,7 +124,7 @@ Here is an example showing how you can manually start the ``keystone-auth`` serv
keystone-legacy-auth: INFO certfile /etc/keystone/ssl/certs/keystone.pem
keystone-legacy-auth: INFO debug True
keystone-legacy-auth: INFO default_store sqlite
- keystone-legacy-auth: INFO extensions osksadm,oskscatalog
+ keystone-legacy-auth: INFO extensions osksadm,oskscatalog,hpidm
keystone-legacy-auth: INFO hash-password True
keystone-legacy-auth: INFO keyfile /etc/keystone/ssl/private/keystonekey.pem
keystone-legacy-auth: INFO keystone-admin-role Admin
@@ -156,7 +156,7 @@ Here is an example showing how you can manually start the ``keystone-auth`` serv
admin : INFO certfile /etc/keystone/ssl/certs/keystone.pem
admin : INFO debug True
admin : INFO default_store sqlite
- admin : INFO extensions osksadm,oskscatalog
+ admin : INFO extensions osksadm,oskscatalog,hpidm
admin : INFO hash-password True
admin : INFO keyfile /etc/keystone/ssl/private/keystonekey.pem
admin : INFO keystone-admin-role Admin
diff --git a/doc/source/extensions.rst b/doc/source/extensions.rst
index 9595e1f6..fa6be0df 100644
--- a/doc/source/extensions.rst
+++ b/doc/source/extensions.rst
@@ -71,6 +71,15 @@ RAX-KEY
This is an Admin and Service API extension.
+HP-IDM
+
+ This extension adds capability to filter roles with optional service IDs
+ for token validation to mitigate security risks with role name conflicts.
+ See https://bugs.launchpad.net/keystone/+bug/890411 for more details.
+
+ This is an Admin API extension. Applicable to validate token (GET)
+ and check token (HEAD) APIs only.
+
.. note::
The included extensions are in the process of being rewritten. Currently
diff --git a/etc/keystone.conf b/etc/keystone.conf
index e46e5c7d..d7c0e036 100644
--- a/etc/keystone.conf
+++ b/etc/keystone.conf
@@ -28,7 +28,7 @@ service-header-mappings = {
#List of extensions currently loaded.
#Refer docs for list of supported extensions.
-extensions= osksadm,oskscatalog
+extensions= osksadm, oskscatalog, hpidm
# Address to bind the API server
# TODO Properties defined within app not available via pipeline.
@@ -74,6 +74,18 @@ keystone-service-admin-role = KeystoneServiceAdmin
#Tells whether password user need to be hashed in the backend
hash-password = True
+# This property is applicable to hpidm extension only.
+# It will be ignored if hpidm extension is disabled.
+#
+# Specify the global service ID to dictate how the global roles
+# are to be returned/processed in validate token call. Notice
+# that middle-ware or API clients must specify the exact same
+# global service ID in order for Keystone to retrieve the
+# global roles in validate token call. Otherwise, it will
+# likely result in a 401 since the mismatched global ID
+# may not exist in Keystone and therefore considered invalid.
+global_service_id = global
+
[keystone.backends.sqlalchemy]
# SQLAlchemy connection string for the reference implementation registry
# server. Any valid SQLAlchemy connection string is fine.
diff --git a/etc/ldap.conf b/etc/ldap.conf
index 5bb64ba3..d070e325 100644
--- a/etc/ldap.conf
+++ b/etc/ldap.conf
@@ -14,7 +14,7 @@ default_store = sqlite
log_file = keystone.ldap.log
log_dir = .
backends = keystone.backends.sqlalchemy,keystone.backends.ldap
-extensions= osksadm,oskscatalog
+extensions= osksadm, oskscatalog, hpidm
service-header-mappings = {
'nova' : 'X-Server-Management-Url',
'swift' : 'X-Storage-Url',
diff --git a/etc/memcache.conf b/etc/memcache.conf
index 0e3ed450..1d27636f 100644
--- a/etc/memcache.conf
+++ b/etc/memcache.conf
@@ -23,7 +23,7 @@ default_store = sqlite
log_file = keystone.memcache.log
log_dir = .
backends = keystone.backends.sqlalchemy,keystone.backends.memcache
-extensions= osksadm,oskscatalog
+extensions= osksadm, oskscatalog, hpidm
service-header-mappings = {
'nova' : 'X-Server-Management-Url',
'swift' : 'X-Storage-Url',
diff --git a/etc/ssl.conf b/etc/ssl.conf
index 7d861a9f..5ae67c6c 100644
--- a/etc/ssl.conf
+++ b/etc/ssl.conf
@@ -21,6 +21,7 @@ default_store = sqlite
log_file = keystone.ssl.log
log_dir = .
backends = keystone.backends.sqlalchemy
+extensions= osksadm, oskscatalog, hpidm
service-header-mappings = {
'nova' : 'X-Server-Management-Url',
'swift' : 'X-Storage-Url',
diff --git a/keystone/backends/__init__.py b/keystone/backends/__init__.py
index db9218d2..a892295e 100755
--- a/keystone/backends/__init__.py
+++ b/keystone/backends/__init__.py
@@ -23,6 +23,7 @@ DEFAULT_BACKENDS = 'keystone.backends.sqlalchemy'
#Configs applicable to all backends.
SHOULD_HASH_PASSWORD = None
+GLOBAL_SERVICE_ID = None # to facilitate global roles for validate tokens
def configure_backends(options):
@@ -38,3 +39,6 @@ def configure_backends(options):
if "hash-password" in options\
and ast.literal_eval(options["hash-password"]) == True:
SHOULD_HASH_PASSWORD = options["hash-password"]
+
+ global GLOBAL_SERVICE_ID
+ GLOBAL_SERVICE_ID = options.get("global_service_id", "global")
diff --git a/keystone/backends/ldap/fakeldap.py b/keystone/backends/ldap/fakeldap.py
index 09fcff0e..ee80d15a 100644
--- a/keystone/backends/ldap/fakeldap.py
+++ b/keystone/backends/ldap/fakeldap.py
@@ -93,6 +93,12 @@ def _match(key, value, attrs):
# This is a wild card search. Implemented as all or nothing for now.
if value == "*":
return True
+ if key == 'serviceId':
+ # for serviceId, the backend is returning a list of numbers
+ # make sure we convert them to strings first before comparing
+ # them
+ str_sids = map(lambda x: str(x), attrs[key])
+ return str(value) in str_sids
if key != "objectclass":
return value in attrs[key]
# it is an objectclass check, so check subclasses
diff --git a/keystone/contrib/extensions/admin/hpidm/__init__.py b/keystone/contrib/extensions/admin/hpidm/__init__.py
new file mode 100644
index 00000000..59086ef7
--- /dev/null
+++ b/keystone/contrib/extensions/admin/hpidm/__init__.py
@@ -0,0 +1,33 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2010 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from keystone.contrib.extensions.admin.extension import BaseExtensionHandler
+from keystone.controllers.token import TokenController
+
+
+class ExtensionHandler(BaseExtensionHandler):
+ def map_extension_methods(self, mapper, options):
+ token_controller = TokenController(options)
+
+ # Token Operations
+ mapper.connect("/tokens/{token_id}", controller=token_controller,
+ action="validate_token",
+ conditions=dict(method=["GET"]))
+ mapper.connect("/tokens/{tenant_id}",
+ controller=token_controller,
+ action="check_token", conditions=dict(method=["HEAD"]))
diff --git a/keystone/contrib/extensions/admin/hpidm/extension.json b/keystone/contrib/extensions/admin/hpidm/extension.json
new file mode 100644
index 00000000..5804de07
--- /dev/null
+++ b/keystone/contrib/extensions/admin/hpidm/extension.json
@@ -0,0 +1,21 @@
+{
+ "extension": {
+ "name": "HP Token Validation Extension",
+ "namespace": "http://docs.openstack.org/identity/api/ext/HP-IDM/v1.0",
+ "alias": "HP-IDM",
+ "updated": "2011-12-06T19:00:00-00:00",
+ "description": "Validate token with the optional serviceId parameter so that only the roles associated with the given service IDs are returned. See https://bugs.launchpad.net/keystone/+bug/890411 for more details.",
+ "links": [
+ {
+ "rel": "describedby",
+ "type": "application/pdf",
+ "href": "https://github.com/openstack/keystone/raw/master/keystone/content/admin/HP-IDM-admin-devguide.pdf"
+ },
+ {
+ "rel": "describedby",
+ "type": "application/vnd.sun.wadl+xml",
+ "href": "https://raw.github.com/openstack/keystone/master/keystone/content/admin/HP-IDM-admin.wadl"
+ }
+ ]
+ }
+}
diff --git a/keystone/contrib/extensions/admin/hpidm/extension.xml b/keystone/contrib/extensions/admin/hpidm/extension.xml
new file mode 100644
index 00000000..0482faac
--- /dev/null
+++ b/keystone/contrib/extensions/admin/hpidm/extension.xml
@@ -0,0 +1,21 @@
+<?xml version="1.0" encoding="UTF-8"?>
+ <extension
+ xmlns="http://docs.openstack.org/common/api/v1.0"
+ xmlns:atom="http://www.w3.org/2005/Atom"
+ name="HP Token Validation Extension"
+ namespace="http://docs.openstack.org/identity/api/ext/HP-IDM/v1.0"
+ alias="HP-IDM"
+ updated="2011-12-25T17:00:00-00:00">
+
+ <description>
+ Validate token with the optional serviceId parameter so that only the roles associated with the given service IDs are returned. See https://bugs.launchpad.net/keystone/+bug/890411 for more details.
+ </description>
+
+ <atom:link rel="describedby"
+ type="application/pdf"
+ href="https://github.com/openstack/keystone/raw/master/keystone/content/admin/HP-IDM-admin-devguide.pdf"/>
+ <atom:link rel="describedby"
+ type="application/vnd.sun.wadl+xml"
+ href="https://github.com/openstack/keystone/raw/master/keystone/content/admin/HP-IDM-admin.wadl"/>
+ </extension>
+
diff --git a/keystone/controllers/token.py b/keystone/controllers/token.py
index dc1b8974..7e7a1ad2 100644
--- a/keystone/controllers/token.py
+++ b/keystone/controllers/token.py
@@ -26,6 +26,7 @@ calls from the request routers.
from keystone import utils
from keystone.common import wsgi
+from keystone.logic import extension_reader
from keystone.logic.types import auth
from keystone.logic.types import fault
from keystone.logic import service
@@ -68,8 +69,12 @@ class TokenController(wsgi.Controller):
def _validate_token(self, req, token_id):
"""Validates the token, and that it belongs to the specified tenant"""
belongs_to = req.GET.get('belongsTo')
+ service_ids = None
+ if extension_reader.is_extension_supported(self.options, 'hpidm'):
+ # service IDs are only relevant if hpidm extension is enabled
+ service_ids = req.GET.get('HP-IDM-serviceId')
return self.identity_service.validate_token(
- utils.get_auth_token(req), token_id, belongs_to)
+ utils.get_auth_token(req), token_id, belongs_to, service_ids)
@utils.wrap_error
def validate_token(self, req, token_id):
diff --git a/keystone/logic/extension_reader.py b/keystone/logic/extension_reader.py
index 1c7d4914..b41b841a 100644
--- a/keystone/logic/extension_reader.py
+++ b/keystone/logic/extension_reader.py
@@ -10,6 +10,29 @@ from keystone.logic.types.extension import Extensions
EXTENSIONS_PATH = 'contrib/extensions'
+def get_supported_extensions(options):
+ """
+ Returns list of supported extensions.
+ options - global configuration options
+ """
+
+ return [extension.strip() for extension in
+ options.get(CONFIG_EXTENSION_PROPERTY,
+ DEFAULT_EXTENSIONS).split(',')]
+
+
+def is_extension_supported(options, extension_name):
+ """
+ Return True if the extension is enabled, False otherwise.
+ options - global configuration options
+ extension_name - extension name
+ extension_name is case-sensitive.
+ """
+ if (extension_name is not None) and (options is not None):
+ return extension_name in get_supported_extensions(options)
+ return False
+
+
class ExtensionsReader(object):
"""Reader to read static extensions content"""
@@ -75,8 +98,7 @@ class ExtensionsReader(object):
def __get_supported_extensions(self):
""" Returns list of supported extensions."""
if self.supported_extensions is None:
- self.supported_extensions = self.options.get(
- CONFIG_EXTENSION_PROPERTY, DEFAULT_EXTENSIONS).split(',')
+ self.supported_extensions = get_supported_extensions(self.options)
return self.supported_extensions
def __get_extension_json(self, extension_name):
diff --git a/keystone/logic/service.py b/keystone/logic/service.py
index 424a7a64..fb807920 100755
--- a/keystone/logic/service.py
+++ b/keystone/logic/service.py
@@ -253,7 +253,7 @@ def get_auth_data(dtoken):
return auth.AuthData(token, user, endpoints, url_types=url_types)
-def get_validate_data(dtoken, duser):
+def get_validate_data(dtoken, duser, service_ids=None):
"""return ValidateData object for a token/user pair"""
tenant = None
if dtoken.tenant_id:
@@ -262,19 +262,14 @@ def get_validate_data(dtoken, duser):
token = auth.Token(dtoken.expires, dtoken.id, tenant)
- ts = []
- if dtoken.tenant_id:
- drole_refs = api.ROLE.ref_get_all_tenant_roles(duser.id,
- dtoken.tenant_id)
- for drole_ref in drole_refs:
- drole = api.ROLE.get(drole_ref.role_id)
- ts.append(Role(drole_ref.role_id, drole.name,
- None, drole_ref.tenant_id))
- drole_refs = api.ROLE.ref_get_all_global_roles(duser.id)
- for drole_ref in drole_refs:
- drole = api.ROLE.get(drole_ref.role_id)
- ts.append(Role(drole_ref.role_id, drole.name,
- None, drole_ref.tenant_id))
+ ts = get_tenant_roles_for_user_and_services(duser.id,
+ dtoken.tenant_id,
+ service_ids)
+ if (not dtoken.tenant_id or not service_ids or
+ (backends.GLOBAL_SERVICE_ID in service_ids)):
+ # return the global roles for unscoped tokens or
+ # its ID is in the service IDs
+ ts = ts + get_global_roles_for_user(duser.id)
# Also get the user's tenant's name
tenant_name = None
@@ -344,6 +339,90 @@ def validate_token(token_id, belongs_to=None, is_check_token=None):
return (token, user)
+def parse_service_ids(service_ids):
+ """
+ Method to parse the service IDs string.
+ service_ids -- comma-separated service IDs
+ parse and return a list of service IDs.
+ """
+ if service_ids:
+ return service_ids.rstrip().split(',')
+ return []
+
+
+def validate_service_ids(service_ids):
+ """
+ Method to validate the service IDs.
+ service_ids -- list of service IDs
+ If not service IDs or encounter an invalid service ID,
+ fault.UnauthorizedFault will be raised.
+ """
+ if not service_ids:
+ raise fault.UnauthorizedFault("Missing service IDs")
+
+ services = [api.SERVICE.get(service_id) for service_id in service_ids
+ if not service_id == backends.GLOBAL_SERVICE_ID]
+ if not all(services):
+ raise fault.UnauthorizedFault("Invalid service ID: %s" % (service_id))
+
+
+def get_roles_names_by_service_ids(service_ids):
+ """
+ Method to find all the roles for the given service IDs.
+ service_ids -- list of service IDs
+ """
+ roles = []
+ for service_id in service_ids:
+ if service_id != backends.GLOBAL_SERVICE_ID:
+ sroles = api.ROLE.get_by_service(service_id=service_id)
+ if sroles:
+ roles = roles + sroles
+ return [role.name for role in roles]
+
+
+def get_global_roles_for_user(user_id):
+ """
+ Method to return all the global roles for the given user.
+ user_id -- user ID
+ """
+ ts = []
+ drole_refs = api.ROLE.ref_get_all_global_roles(user_id)
+ for drole_ref in drole_refs:
+ drole = api.ROLE.get(drole_ref.role_id)
+ ts.append(Role(drole_ref.role_id, drole.name,
+ None, drole_ref.tenant_id))
+ return ts
+
+
+def get_tenant_roles_for_user_and_services(user_id, tenant_id,
+ service_ids):
+ """
+ Method to return all the tenant roles for the given user,
+ filtered by service ID.
+ user_id -- user ID
+ tenant_id -- tenant ID
+ service_ids -- service IDs
+ If service_ids are specified, will return the roles filtered by
+ service IDs.
+ """
+ ts = []
+ if tenant_id and user_id:
+ drole_refs = api.ROLE.ref_get_all_tenant_roles(user_id,
+ tenant_id)
+ for drole_ref in drole_refs:
+ drole = api.ROLE.get(drole_ref.role_id)
+ ts.append(Role(drole_ref.role_id, drole.name,
+ None, drole_ref.tenant_id))
+
+ if service_ids:
+ # if service IDs are specified, filter roles by service IDs
+ sroles_names = get_roles_names_by_service_ids(service_ids)
+ return [role for role in ts
+ if role.name in sroles_names]
+ else:
+ return ts
+
+
class IdentityService(object):
"""Implements the Identity service
@@ -474,10 +553,20 @@ class IdentityService(object):
return get_auth_data(dtoken)
@staticmethod
- def validate_token(admin_token, token_id, belongs_to=None):
+ def validate_token(admin_token, token_id, belongs_to=None,
+ service_ids=None):
validate_service_admin_token(admin_token)
(token, user) = validate_token(token_id, belongs_to, True)
- return get_validate_data(token, user)
+ if service_ids and (token.tenant_id or belongs_to):
+ # scope token, validate the service IDs if present
+ service_ids = parse_service_ids(service_ids)
+ validate_service_ids(service_ids)
+ auth_data = get_validate_data(token, user, service_ids)
+ if service_ids and (token.tenant_id or belongs_to):
+ # we have service Ids and scope token, make sure we have some roles
+ if not auth_data.user.role_refs.values:
+ raise fault.UnauthorizedFault("No roles found for scope token")
+ return auth_data
@staticmethod
def revoke_token(admin_token, token_id):
diff --git a/keystone/manage/__init__.py b/keystone/manage/__init__.py
index 518a00e0..70230341 100644
--- a/keystone/manage/__init__.py
+++ b/keystone/manage/__init__.py
@@ -180,7 +180,7 @@ def process(*args):
raise optparse.OptParseError(ACTION_NOT_SUPPORTED % ('tenants'))
elif (object_type, action) == ('role', 'add'):
- if api.add_role(name=object_id):
+ if api.add_role(name=object_id, service_name=optional_arg(args, 3)):
print "SUCCESS: Role %s created successfully." % object_id
elif (object_type, action) == ('role', 'list'):
diff --git a/keystone/manage/api.py b/keystone/manage/api.py
index 02aea17a..a87b88be 100644
--- a/keystone/manage/api.py
+++ b/keystone/manage/api.py
@@ -54,9 +54,17 @@ def disable_tenant(name):
return db_api.TENANT.update(obj.id, obj)
-def add_role(name):
+def add_role(name, service_name=None):
obj = db_models.Role()
obj.name = name
+
+ names = name.split(":")
+ if len(names) == 2:
+ service_name = names[0] or service_name
+ if service_name:
+ # we have a role with service prefix, fill in the service ID
+ service = db_api.SERVICE.get_by_name(name=service_name)
+ obj.service_id = service.id
return db_api.ROLE.create(obj)
diff --git a/keystone/middleware/auth_token.py b/keystone/middleware/auth_token.py
index 3277dc58..0fce68a1 100644
--- a/keystone/middleware/auth_token.py
+++ b/keystone/middleware/auth_token.py
@@ -104,6 +104,7 @@ import json
import os
from paste.deploy import loadapp
import time
+import urllib
from urlparse import urlparse
from webob.exc import HTTPUnauthorized
from webob.exc import Request, Response
@@ -159,6 +160,11 @@ class AuthProtocol(object):
self.service_protocol = conf.get('service_protocol', 'https')
self.service_host = conf.get('service_host')
service_port = conf.get('service_port')
+ service_ids = conf.get('service_ids')
+ self.serviceId_qs = ''
+ if service_ids:
+ self.serviceId_qs = '?HP-IDM-serviceId=%s' % \
+ (urllib.quote(service_ids))
if service_port:
self.service_port = int(service_port)
self.service_url = '%s://%s:%s' % (self.service_protocol,
@@ -429,7 +435,8 @@ class AuthProtocol(object):
# "X-Auth-Token": admin_token}
# we're using a test token from the ini file for now
conn = http_connect(self.auth_host, self.auth_port, 'GET',
- '/v2.0/tokens/%s' % claims, headers=headers,
+ '/v2.0/tokens/%s%s' % (claims, self.serviceId_qs),
+ headers=headers,
ssl=(self.auth_protocol == 'https'),
key_file=self.key_file, cert_file=self.cert_file,
timeout=self.auth_timeout)
diff --git a/keystone/middleware/quantum_auth_token.py b/keystone/middleware/quantum_auth_token.py
index 77ba3606..58f9f2df 100755
--- a/keystone/middleware/quantum_auth_token.py
+++ b/keystone/middleware/quantum_auth_token.py
@@ -71,6 +71,7 @@ HTTP_X_AUTHORIZATION
import httplib
import json
import logging
+import urllib
from urlparse import urlparse
from webob.exc import HTTPUnauthorized, Request, Response
@@ -129,11 +130,17 @@ class AuthProtocol(object):
self.admin_user = conf.get('auth_admin_user')
self.admin_password = conf.get('auth_admin_password')
self.admin_token = conf.get('auth_admin_token')
+ # bind to one or more service instances
+ service_ids = conf.get('service_ids')
+ self.serviceId_qs = ''
+ if service_ids:
+ self.serviceId_qs = '?HP-IDM-serviceId=%s' % \
+ (urllib.quote(service_ids))
def _build_token_uri(self, claims=None):
- uri = "/v" + self.auth_api_version + "/tokens" + \
- (claims and '/' + claims or '')
- return uri
+ claim_str = "/%s" % claims if claims else ""
+ return "/v%s/tokens%s%s" % (self.auth_api_version, claim_str,
+ self.serviceId_qs or '')
def __init__(self, app, conf):
""" Common initialization code """
diff --git a/keystone/test/__init__.py b/keystone/test/__init__.py
index 5e0d7c8a..8fa43744 100644
--- a/keystone/test/__init__.py
+++ b/keystone/test/__init__.py
@@ -353,6 +353,7 @@ class KeystoneTest(object):
"""
config_params = {'test_dir': TEST_DIR, 'base_dir': BASE_DIR}
isSsl = False
+ hpidmDisabled = False
config_name = None
test_files = None
server = None
@@ -398,6 +399,10 @@ class KeystoneTest(object):
if (self.isSsl == True):
os.environ['cert_file'] = TEST_CERT
+ # indicating HP-IDM is disabled
+ if self.hpidmDisabled:
+ os.environ['HP-IDM_Disabled'] = 'True'
+
# run the keystone server
logger.info("Starting the keystone server...")
@@ -659,3 +664,10 @@ class LDAPTest(SQLTest):
from keystone.backends.ldap.fakeldap import FakeShelve
db = FakeShelve().get_instance()
db.clear()
+
+
+class ClientWithoutHPIDMTest(ClientTests):
+ """Test with HP-IDM disabled to make sure it is backward compatible"""
+ config_name = 'sql_no_hpidm.conf.template'
+ hpidmDisabled = True
+ test_files = ('keystone.nohpidm.db',)
diff --git a/keystone/test/client/test_extensions.py b/keystone/test/client/test_extensions.py
index 551df491..bb5d40a8 100644
--- a/keystone/test/client/test_extensions.py
+++ b/keystone/test/client/test_extensions.py
@@ -1,3 +1,4 @@
+import os
import unittest2 as unittest
from keystone.test.functional import common
@@ -28,13 +29,18 @@ class TestAdminExtensions(common.ApiTestCase):
self.assertIsNotNone(content['extensions']['values'])
found_osksadm = False
found_oskscatalog = False
+ found_hpidm = False
for value in content['extensions']['values']:
if value['extension']['alias'] == 'OS-KSADM':
found_osksadm = True
if value['extension']['alias'] == 'OS-KSCATALOG':
found_oskscatalog = True
+ if value['extension']['alias'] == 'HP-IDM':
+ found_hpidm = True
self.assertTrue(found_osksadm, "Missing OS-KSADM extension.")
self.assertTrue(found_oskscatalog, "Missing OS-KSCATALOG extension.")
+ if not common.isSsl() and 'HP-IDM_Disabled' not in os.environ:
+ self.assertTrue(found_hpidm, "Missing HP-IDM extension.")
def test_extensions_xml(self):
r = self.admin_request(path='/extensions.xml')
@@ -44,13 +50,18 @@ class TestAdminExtensions(common.ApiTestCase):
"{http://docs.openstack.org/common/api/v1.0}extension")
found_osksadm = False
found_oskscatalog = False
+ found_hpidm = False
for extension in extensions:
if extension.get("alias") == 'OS-KSADM':
found_osksadm = True
if extension.get("alias") == 'OS-KSCATALOG':
found_oskscatalog = True
+ if extension.get("alias") == 'HP-IDM':
+ found_hpidm = True
self.assertTrue(found_osksadm, "Missing OS-KSADM extension.")
self.assertTrue(found_oskscatalog, "Missing OS-KSCATALOG extension.")
+ if not common.isSsl() and 'HP-IDM_Disabled' not in os.environ:
+ self.assertTrue(found_hpidm, "Missing HP-IDM extension.")
if __name__ == '__main__':
diff --git a/keystone/test/etc/ldap.conf.template b/keystone/test/etc/ldap.conf.template
index 0ec3934b..feb73ecd 100644
--- a/keystone/test/etc/ldap.conf.template
+++ b/keystone/test/etc/ldap.conf.template
@@ -5,7 +5,7 @@ default_store = sqlite
log_file = %(test_dir)s/keystone.ldap.log
log_dir = %(test_dir)s
backends = keystone.backends.sqlalchemy,keystone.backends.ldap
-extensions= osksadm,oskscatalog
+extensions= osksadm, oskscatalog, hpidm
service-header-mappings = {
'nova' : 'X-Server-Management-Url',
'swift' : 'X-Storage-Url',
diff --git a/keystone/test/etc/memcache.conf.template b/keystone/test/etc/memcache.conf.template
index 80c39153..41bbb0b6 100644
--- a/keystone/test/etc/memcache.conf.template
+++ b/keystone/test/etc/memcache.conf.template
@@ -5,7 +5,7 @@ default_store = sqlite
log_file = %(test_dir)s/keystone.memcache.log
log_dir = %(test_dir)s
backends = keystone.backends.sqlalchemy,keystone.backends.memcache
-extensions= osksadm,oskscatalog
+extensions= osksadm, oskscatalog, hpidm
service-header-mappings = {
'nova' : 'X-Server-Management-Url',
'swift' : 'X-Storage-Url',
diff --git a/keystone/test/etc/sql.conf.template b/keystone/test/etc/sql.conf.template
index 37864e7c..d1aec939 100644
--- a/keystone/test/etc/sql.conf.template
+++ b/keystone/test/etc/sql.conf.template
@@ -5,7 +5,7 @@ default_store = sqlite
log_file = %(test_dir)s/keystone.sql.log
log_dir = %(test_dir)s
backends = keystone.backends.sqlalchemy
-extensions= osksadm,oskscatalog
+extensions= osksadm, oskscatalog, hpidm
service-header-mappings = {
'nova' : 'X-Server-Management-Url',
'swift' : 'X-Storage-Url',
diff --git a/keystone/test/etc/sql_no_hpidm.conf.template b/keystone/test/etc/sql_no_hpidm.conf.template
new file mode 100644
index 00000000..fde2eb44
--- /dev/null
+++ b/keystone/test/etc/sql_no_hpidm.conf.template
@@ -0,0 +1,54 @@
+[DEFAULT]
+verbose = False
+debug = False
+default_store = sqlite
+log_file = %(test_dir)s/keystone.sql.log
+log_dir = %(test_dir)s
+backends = keystone.backends.sqlalchemy
+extensions= osksadm, oskscatalog
+service-header-mappings = {
+ 'nova' : 'X-Server-Management-Url',
+ 'swift' : 'X-Storage-Url',
+ 'cdn' : 'X-CDN-Management-Url'}
+service_host = 0.0.0.0
+service_port = %(service_port)s
+service_ssl = False
+admin_host = 0.0.0.0
+admin_port = %(admin_port)s
+admin_ssl = False
+keystone-admin-role = Admin
+keystone-service-admin-role = KeystoneServiceAdmin
+hash-password = True
+
+[keystone.backends.sqlalchemy]
+sql_connection = sqlite://
+sql_idle_timeout = 30
+backend_entities = ['Endpoints', 'Credentials', 'EndpointTemplates', 'Tenant', 'User', 'UserRoleAssociation', 'Role', 'Token', 'Service']
+
+[pipeline:admin]
+pipeline =
+ urlrewritefilter
+ d5_compat
+ admin_api
+
+[pipeline:keystone-legacy-auth]
+pipeline =
+ urlrewritefilter
+ legacy_auth
+ d5_compat
+ service_api
+
+[app:service_api]
+paste.app_factory = keystone.server:service_app_factory
+
+[app:admin_api]
+paste.app_factory = keystone.server:admin_app_factory
+
+[filter:urlrewritefilter]
+paste.filter_factory = keystone.middleware.url:filter_factory
+
+[filter:d5_compat]
+paste.filter_factory = keystone.frontends.d5_compat:filter_factory
+
+[filter:legacy_auth]
+paste.filter_factory = keystone.frontends.legacy_token_auth:filter_factory
diff --git a/keystone/test/etc/ssl.conf.template b/keystone/test/etc/ssl.conf.template
index 3d05710e..1f401975 100644
--- a/keystone/test/etc/ssl.conf.template
+++ b/keystone/test/etc/ssl.conf.template
@@ -5,6 +5,7 @@ default_store = sqlite
log_file = %(test_dir)s/keystone.ssl.log
log_dir = %(test_dir)s
backends = keystone.backends.sqlalchemy
+extensions= osksadm, oskscatalog, hpidm
service-header-mappings = {
'nova' : 'X-Server-Management-Url',
'swift' : 'X-Storage-Url',
diff --git a/keystone/test/functional/common.py b/keystone/test/functional/common.py
index 5372e2ac..358de51b 100644
--- a/keystone/test/functional/common.py
+++ b/keystone/test/functional/common.py
@@ -229,7 +229,7 @@ class ApiTestCase(RestfulTestCase):
"'Tenant', 'User', 'Credentials', 'EndpointTemplates', "
"'Token', 'Service']",
},
- 'extensions': 'osksadm,oskscatalog',
+ 'extensions': 'osksadm,oskscatalog,hpidm',
'keystone-admin-role': 'Admin',
'keystone-service-admin-role': 'KeystoneServiceAdmin',
'hash-password': 'True',
@@ -1214,12 +1214,15 @@ class FunctionalTestCase(ApiTestCase):
return self.delete_user_role(user_id, tenant_id, **kwargs)
def create_role(self, role_name=None, role_description=None,
- service_id=None, **kwargs):
+ service_id=None, service_name=None, **kwargs):
"""Creates a role for testing
The role name and description are generated from UUIDs.
"""
- role_name = optional_str(role_name)
+ if service_name and not role_name:
+ role_name = "%s:%s" % (service_name, optional_str(role_name))
+ else:
+ role_name = optional_str(role_name)
role_description = optional_str(role_description)
data = {
@@ -1517,6 +1520,18 @@ class MiddlewareTestCase(FunctionalTestCase):
"""
use_server = True
+ def _setup_test_middleware(self):
+ test_middleware = None
+ if isinstance(self.middleware, tuple):
+ test_middleware = HeaderApp()
+ for filter in self.middleware:
+ test_middleware = \
+ filter.filter_factory(self.settings)(test_middleware)
+ else:
+ test_middleware = \
+ self.middleware.filter_factory(self.settings)(HeaderApp())
+ return test_middleware
+
def setUp(self, middleware, settings=None):
super(MiddlewareTestCase, self).setUp()
if settings is None:
@@ -1535,14 +1550,9 @@ class MiddlewareTestCase(FunctionalTestCase):
cert_file = isSsl()
if cert_file:
settings['certfile'] = cert_file
- if isinstance(middleware, tuple):
- self.test_middleware = HeaderApp()
- for filter in middleware:
- self.test_middleware = \
- filter.filter_factory(settings)(self.test_middleware)
- else:
- self.test_middleware = \
- middleware.filter_factory(settings)(HeaderApp())
+ self.settings = settings
+ self.middleware = middleware
+ self.test_middleware = self._setup_test_middleware()
name = unique_str()
r = self.create_tenant(tenant_name=name, assert_status=201)
@@ -1571,6 +1581,60 @@ class MiddlewareTestCase(FunctionalTestCase):
self.create_endpoint_for_tenant(self.tenant['id'],
self.endpoint_templates[x]['id'])
+ @unittest.skipIf(isSsl() or 'HP-IDM_Disabled' in os.environ,
+ "Skipping SSL or HP-IDM tests")
+ def test_with_service_id(self):
+ # create a service role so the scope token validation will succeed
+ role_resp = self.create_role(service_name=self.services[0]['name'])
+ role = role_resp.json['role']
+ self.grant_role_to_user(self.tenant_user['id'],
+ role['id'], self.tenant['id'])
+ auth_resp = self.authenticate(self.tenant_user['name'],
+ self.tenant_user['password'],
+ self.tenant['id'], assert_status=200)
+ user_token = auth_resp.json['access']['token']['id']
+ self.settings['service_ids'] = "%s" % self.services[0]['id']
+ test_middleware = self._setup_test_middleware()
+ resp = Request.blank('/',
+ headers={'X-Auth-Token': user_token}) \
+ .get_response(test_middleware)
+ self.assertEquals(resp.status_int, 200)
+
+ # now give it a bogus service ID to make sure we get a 401
+ self.settings['service_ids'] = "boguzz"
+ test_middleware = self._setup_test_middleware()
+ resp = Request.blank('/',
+ headers={'X-Auth-Token': user_token}) \
+ .get_response(test_middleware)
+ self.assertEquals(resp.status_int, 401)
+
+ @unittest.skipUnless(not isSsl() and 'HP-IDM_Disabled' in os.environ,
+ "Skipping since HP-IDM is enabled")
+ def test_with_service_id_with_hpidm_disabled(self):
+ # create a service role so the scope token validation will succeed
+ role_resp = self.create_role(service_name=self.services[0]['name'])
+ role = role_resp.json['role']
+ self.grant_role_to_user(self.tenant_user['id'],
+ role['id'], self.tenant['id'])
+ auth_resp = self.authenticate(self.tenant_user['name'],
+ self.tenant_user['password'],
+ self.tenant['id'], assert_status=200)
+ user_token = auth_resp.json['access']['token']['id']
+ self.settings['service_ids'] = "%s" % self.services[0]['id']
+ test_middleware = self._setup_test_middleware()
+ resp = Request.blank('/',
+ headers={'X-Auth-Token': user_token}) \
+ .get_response(test_middleware)
+ self.assertEquals(resp.status_int, 200)
+
+ # now give it a bogus service ID to make sure it got ignored
+ self.settings['service_ids'] = "boguzz"
+ test_middleware = self._setup_test_middleware()
+ resp = Request.blank('/',
+ headers={'X-Auth-Token': user_token}) \
+ .get_response(test_middleware)
+ self.assertEquals(resp.status_int, 200)
+
def test_401_without_token(self):
resp = Request.blank('/').get_response(self.test_middleware)
self.assertEquals(resp.status_int, 401)
diff --git a/keystone/test/functional/test_extensions.py b/keystone/test/functional/test_extensions.py
new file mode 100644
index 00000000..ba090bda
--- /dev/null
+++ b/keystone/test/functional/test_extensions.py
@@ -0,0 +1,258 @@
+import unittest2 as unittest
+from keystone.test.functional import common
+
+
+class TestHPIDMTokensExtension(common.FunctionalTestCase):
+ """Test HP-IDM token validation extension"""
+
+ def setUp(self):
+ super(TestHPIDMTokensExtension, self).setUp()
+ password = common.unique_str()
+ self.user = self.create_user(user_password=password).json['user']
+ self.user['password'] = password
+ self.tenant = self.create_tenant().json['tenant']
+ self.service = self.create_service().json['OS-KSADM:service']
+ r = self.create_role(service_name=self.service['name'])
+ self.role = r.json['role']
+ self.another_service = self.create_service().json['OS-KSADM:service']
+ self.service_with_no_users = self.create_service().\
+ json['OS-KSADM:service']
+ ar = self.create_role(service_name=self.another_service['name'])
+ self.another_role = ar.json['role']
+ rnu = self.create_role(service_name=self.service_with_no_users['name'])
+ self.role_with_no_users = rnu.json['role']
+ rns = self.create_role()
+ self.role_with_no_service = rns.json['role']
+ self.grant_role_to_user(self.user['id'],
+ self.role['id'], self.tenant['id'])
+ self.grant_role_to_user(self.user['id'],
+ self.role_with_no_service['id'],
+ self.tenant['id'])
+ self.grant_role_to_user(self.user['id'],
+ self.another_role['id'], self.tenant['id'])
+ self.global_role = self.create_role().json['role']
+ # crete a global role
+ self.put_user_role(self.user['id'], self.global_role['id'], None)
+
+ def get_token_belongsto(self, token_id, tenant_id, service_ids, **kwargs):
+ """GET /tokens/{token_id}?belongsTo={tenant_id}
+ [&HP-IDM-serviceId={service_ids}]"""
+ serviceId_qs = ""
+ if service_ids:
+ serviceId_qs = "&HP-IDM-serviceId=%s" % (service_ids)
+ return self.admin_request(method='GET',
+ path='/tokens/%s?belongsTo=%s%s' % (token_id, tenant_id,
+ serviceId_qs), **kwargs)
+
+ def check_token_belongs_to(self, token_id, tenant_id, service_ids,
+ **kwargs):
+ """HEAD /tokens/{token_id}?belongsTo={tenant_id}
+ [&HP-IDM-serviceId={service_ids}]"""
+ serviceId_qs = ""
+ if service_ids:
+ serviceId_qs = "&HP-IDM-serviceId=%s" % (service_ids)
+ return self.admin_request(method='HEAD',
+ path='/tokens/%s?belongsTo=%s%s' % (token_id, tenant_id,
+ serviceId_qs), **kwargs)
+
+ @unittest.skipIf(common.isSsl(),
+ "Skipping SSL tests")
+ def test_token_validation_with_serviceId(self):
+ scoped = self.post_token(as_json={
+ 'auth': {
+ 'passwordCredentials': {
+ 'username': self.user['name'],
+ 'password': self.user['password']},
+ 'tenantName': self.tenant['name']}}).json['access']
+
+ self.assertEqual(scoped['token']['tenant']['id'], self.tenant['id'])
+ self.assertEqual(scoped['token']['tenant']['name'],
+ self.tenant['name'])
+ # And an admin should be able to validate that our new token is scoped
+ r = self.get_token_belongsto(token_id=scoped['token']['id'],
+ tenant_id=self.tenant['id'], service_ids=self.service['id'])
+ access = r.json['access']
+
+ self.assertEqual(access['user']['id'], self.user['id'])
+ self.assertEqual(access['user']['name'], self.user['name'])
+ self.assertEqual(access['token']['tenant']['id'], self.tenant['id'])
+ self.assertEqual(access['token']['tenant']['name'],
+ self.tenant['name'])
+
+ # make sure only the service roles are returned
+ self.assertIsNotNone(access['user'].get('roles'))
+ self.assertEqual(len(access['user']['roles']), 1)
+ self.assertEqual(access['user']['roles'][0]['name'],
+ self.role['name'])
+
+ # make sure check token also works
+ self.check_token_belongs_to(token_id=scoped['token']['id'],
+ tenant_id=self.tenant['id'], service_ids=self.service['id'],
+ assert_status=200)
+
+ @unittest.skipIf(common.isSsl(),
+ "Skipping SSL tests")
+ def test_token_validation_with_all_serviceId(self):
+ scoped = self.post_token(as_json={
+ 'auth': {
+ 'passwordCredentials': {
+ 'username': self.user['name'],
+ 'password': self.user['password']},
+ 'tenantName': self.tenant['name']}}).json['access']
+
+ self.assertEqual(scoped['token']['tenant']['id'], self.tenant['id'])
+ self.assertEqual(scoped['token']['tenant']['name'],
+ self.tenant['name'])
+ # And an admin should be able to validate that our new token is scoped
+ service_ids = "%s,%s" % \
+ (self.service['id'], self.another_service['id'])
+ r = self.get_token_belongsto(token_id=scoped['token']['id'],
+ tenant_id=self.tenant['id'], service_ids=service_ids)
+ access = r.json['access']
+
+ self.assertEqual(access['user']['id'], self.user['id'])
+ self.assertEqual(access['user']['name'], self.user['name'])
+ self.assertEqual(access['token']['tenant']['id'], self.tenant['id'])
+ self.assertEqual(access['token']['tenant']['name'],
+ self.tenant['name'])
+
+ # make sure only the service roles are returned
+ self.assertIsNotNone(access['user'].get('roles'))
+ self.assertEqual(len(access['user']['roles']), 2)
+ role_names = map(lambda x: x['name'], access['user']['roles'])
+ self.assertTrue(self.role['name'] in role_names)
+ self.assertTrue(self.another_role['name'] in role_names)
+
+ @unittest.skipIf(common.isSsl(),
+ "Skipping SSL tests")
+ def test_token_validation_with_no_user_service(self):
+ scoped = self.post_token(as_json={
+ 'auth': {
+ 'passwordCredentials': {
+ 'username': self.user['name'],
+ 'password': self.user['password']},
+ 'tenantName': self.tenant['name']}}).json['access']
+
+ self.assertEqual(scoped['token']['tenant']['id'], self.tenant['id'])
+ self.assertEqual(scoped['token']['tenant']['name'],
+ self.tenant['name'])
+ # And an admin should be able to validate that our new token is scoped
+ service_ids = "%s,%s,%s" % (self.service['id'],
+ self.another_service['id'],
+ self.service_with_no_users['id'])
+ r = self.get_token_belongsto(token_id=scoped['token']['id'],
+ tenant_id=self.tenant['id'], service_ids=service_ids)
+ access = r.json['access']
+
+ self.assertEqual(access['user']['id'], self.user['id'])
+ self.assertEqual(access['user']['name'], self.user['name'])
+ self.assertEqual(access['token']['tenant']['id'], self.tenant['id'])
+ self.assertEqual(access['token']['tenant']['name'],
+ self.tenant['name'])
+
+ # make sure only the service roles are returned, excluding the one
+ # with no users
+ self.assertIsNotNone(access['user'].get('roles'))
+ self.assertEqual(len(access['user']['roles']), 2)
+ role_names = map(lambda x: x['name'], access['user']['roles'])
+ self.assertTrue(self.role['name'] in role_names)
+ self.assertTrue(self.another_role['name'] in role_names)
+
+ # make sure check token also works
+ self.check_token_belongs_to(token_id=scoped['token']['id'],
+ tenant_id=self.tenant['id'], service_ids=service_ids,
+ assert_status=200)
+
+ @unittest.skipIf(common.isSsl(),
+ "Skipping SSL tests")
+ def test_token_validation_without_serviceId(self):
+ scoped = self.post_token(as_json={
+ 'auth': {
+ 'passwordCredentials': {
+ 'username': self.user['name'],
+ 'password': self.user['password']},
+ 'tenantName': self.tenant['name']}}).json['access']
+
+ self.assertEqual(scoped['token']['tenant']['id'], self.tenant['id'])
+ self.assertEqual(scoped['token']['tenant']['name'],
+ self.tenant['name'])
+ # And an admin should be able to validate that our new token is scoped
+ r = self.get_token_belongsto(token_id=scoped['token']['id'],
+ tenant_id=self.tenant['id'], service_ids=None)
+ access = r.json['access']
+
+ self.assertEqual(access['user']['id'], self.user['id'])
+ self.assertEqual(access['user']['name'], self.user['name'])
+ self.assertEqual(access['token']['tenant']['id'], self.tenant['id'])
+ self.assertEqual(access['token']['tenant']['name'],
+ self.tenant['name'])
+
+ # make sure all the roles are returned
+ self.assertIsNotNone(access['user'].get('roles'))
+ self.assertEqual(len(access['user']['roles']), 4)
+ role_names = map(lambda x: x['name'], access['user']['roles'])
+ self.assertTrue(self.role['name'] in role_names)
+ self.assertTrue(self.another_role['name'] in role_names)
+ self.assertTrue(self.global_role['name'] in role_names)
+ self.assertTrue(self.role_with_no_service['name'] in role_names)
+
+ @unittest.skipIf(common.isSsl(),
+ "Skipping SSL tests")
+ def test_token_validation_with_global_service_id(self):
+ scoped = self.post_token(as_json={
+ 'auth': {
+ 'passwordCredentials': {
+ 'username': self.user['name'],
+ 'password': self.user['password']},
+ 'tenantName': self.tenant['name']}}).json['access']
+
+ self.assertEqual(scoped['token']['tenant']['id'], self.tenant['id'])
+ self.assertEqual(scoped['token']['tenant']['name'],
+ self.tenant['name'])
+ service_ids = "%s,%s,global" % (self.service['id'],
+ self.another_service['id'])
+ r = self.get_token_belongsto(token_id=scoped['token']['id'],
+ tenant_id=self.tenant['id'], service_ids=service_ids)
+ access = r.json['access']
+
+ self.assertEqual(access['user']['id'], self.user['id'])
+ self.assertEqual(access['user']['name'], self.user['name'])
+ self.assertEqual(access['token']['tenant']['id'], self.tenant['id'])
+ self.assertEqual(access['token']['tenant']['name'],
+ self.tenant['name'])
+
+ # make sure only the service roles are returned
+ self.assertIsNotNone(access['user'].get('roles'))
+ self.assertEqual(len(access['user']['roles']), 3)
+ role_names = map(lambda x: x['name'], access['user']['roles'])
+ self.assertTrue(self.role['name'] in role_names)
+ self.assertTrue(self.another_role['name'] in role_names)
+ self.assertTrue(self.global_role['name'] in role_names)
+
+ @unittest.skipIf(common.isSsl(),
+ "Skipping SSL tests")
+ def test_token_validation_with_bogus_service_id(self):
+ scoped = self.post_token(as_json={
+ 'auth': {
+ 'passwordCredentials': {
+ 'username': self.user['name'],
+ 'password': self.user['password']},
+ 'tenantName': self.tenant['name']}}).json['access']
+
+ self.assertEqual(scoped['token']['tenant']['id'], self.tenant['id'])
+ self.assertEqual(scoped['token']['tenant']['name'],
+ self.tenant['name'])
+ service_ids = "%s,%s,boguzzz" % (self.service['id'],
+ self.another_service['id'])
+ self.get_token_belongsto(token_id=scoped['token']['id'],
+ tenant_id=self.tenant['id'], service_ids=service_ids,
+ assert_status=401)
+
+ # make sure check token also works
+ self.check_token_belongs_to(token_id=scoped['token']['id'],
+ tenant_id=self.tenant['id'], service_ids=service_ids,
+ assert_status=401)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/keystone/test/sampledata.py b/keystone/test/sampledata.py
index 446c0a21..410e291d 100644
--- a/keystone/test/sampledata.py
+++ b/keystone/test/sampledata.py
@@ -6,8 +6,15 @@ DEFAULT_FIXTURE = [
('tenant', 'add', 'ANOTHER:TENANT'),
('tenant', 'add', 'project-y'),
('tenant', 'disable', 'project-y'),
+ ('tenant', 'add', 'coffee-tea'),
+# Add some services for the service roles
+ ('service', 'add', 'coffee',
+ 'coffee service', 'coffee service'),
+ ('service', 'add', 'tea',
+ 'tea house', 'tea house'),
# Users
('user', 'add', 'joeuser', 'secrete', 'customer-x'),
+ ('user', 'add', 'pete', 'secrete', 'coffee-tea'),
('user', 'add', 'joeadmin', 'secrete', 'customer-x'),
('user', 'add', 'admin', 'secrete'),
('user', 'add', 'serviceadmin', 'secrete', 'customer-x'),
@@ -24,6 +31,11 @@ DEFAULT_FIXTURE = [
('role', 'grant', 'Admin', 'nodefaulttenant', 'customer-x'),
('role', 'add', 'Member'),
('role', 'grant', 'Member', 'joeuser', 'customer-x'),
+ ('role', 'add', 'barista', 'coffee'),
+ ('role', 'add', 'barista', 'tea'),
+ ('role', 'grant', 'barista', 'pete'),
+ ('role', 'add', 'barista-global'),
+ ('role', 'grant', 'barista-global', 'pete'),
# Add Services
#1 Service Name:exampleservice Type:example type
('service', 'add', 'exampleservice',
diff --git a/run_tests.py b/run_tests.py
index 89a08f71..b6f16f14 100755
--- a/run_tests.py
+++ b/run_tests.py
@@ -28,6 +28,7 @@ TESTS = [
# But tests pass
# MemcacheTest,
test.SSLTest,
+ test.ClientWithoutHPIDMTest,
]
diff --git a/run_tests.sh b/run_tests.sh
index e7731f05..9905459f 100755
--- a/run_tests.sh
+++ b/run_tests.sh
@@ -13,6 +13,7 @@ function usage {
echo " SSLTest: runs client tests with SSL configured"
echo " LDAPTest: runs functional tests with LDAP backend"
echo " MemcacheTest: runs functional tests with memcached storing tokens"
+ echo " ClientWithoutHPIDMTest: runs client tests with HP-IDM extension disabled"
echo " Note: by default, run tests will run all suites"
echo " -V, --virtual-env Always use virtualenv. Install automatically if not present"
echo " -N, --no-virtual-env Don't use virtualenv. Run tests in local environment"