summaryrefslogtreecommitdiffstats
path: root/tests/test_keystoneclient.py
diff options
context:
space:
mode:
authorSascha Peilicke <saschpe@suse.de>2013-01-21 15:34:42 +0100
committerMonty Taylor <mordred@inaugust.com>2013-08-14 01:30:01 -0300
commit14e090154c10001550127628c2728013f15d4256 (patch)
tree99be365f496f7942638a95f28241839c6ca8771a /tests/test_keystoneclient.py
parent361f6fe111b3eddf013c544776d63980689dfaf5 (diff)
downloadkeystone-14e090154c10001550127628c2728013f15d4256.tar.gz
keystone-14e090154c10001550127628c2728013f15d4256.tar.xz
keystone-14e090154c10001550127628c2728013f15d4256.zip
Move 'tests' directory into 'keystone' package
Similar to a range of other components (e.g. glance,nova,...) and recent reviews by Monty. Running individual tests can be done like this: ./run_tests.sh keystone.tests.test_drivers Change-Id: I2482a48322150e5eb09b703326a94d8283f1c75b
Diffstat (limited to 'tests/test_keystoneclient.py')
-rw-r--r--tests/test_keystoneclient.py1175
1 files changed, 0 insertions, 1175 deletions
diff --git a/tests/test_keystoneclient.py b/tests/test_keystoneclient.py
deleted file mode 100644
index 38062d4b..00000000
--- a/tests/test_keystoneclient.py
+++ /dev/null
@@ -1,1175 +0,0 @@
-# vim: tabstop=4 shiftwidth=4 softtabstop=4
-
-# Copyright 2012 OpenStack LLC
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import uuid
-import webob
-
-from keystone import test
-
-from keystone import config
-from keystone.openstack.common import jsonutils
-from keystone.openstack.common import timeutils
-
-import default_fixtures
-
-CONF = config.CONF
-DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id
-OPENSTACK_REPO = 'https://review.openstack.org/p/openstack'
-KEYSTONECLIENT_REPO = '%s/python-keystoneclient.git' % OPENSTACK_REPO
-
-
-class CompatTestCase(test.TestCase):
- def setUp(self):
- super(CompatTestCase, self).setUp()
-
- # The backends should be loaded and initialized before the servers are
- # started because the servers use the backends.
-
- self.load_backends()
- self.load_fixtures(default_fixtures)
-
- # TODO(termie): add an admin user to the fixtures and use that user
- # override the fixtures, for now
- self.metadata_foobar = self.identity_api.add_role_to_user_and_project(
- self.user_foo['id'],
- self.tenant_bar['id'],
- self.role_admin['id'])
-
- self.public_server = self.serveapp('keystone', name='main')
- self.admin_server = self.serveapp('keystone', name='admin')
-
- revdir = test.checkout_vendor(*self.get_checkout())
- self.add_path(revdir)
- self.clear_module('keystoneclient')
-
- def tearDown(self):
- self.public_server.kill()
- self.admin_server.kill()
- self.public_server = None
- self.admin_server = None
- super(CompatTestCase, self).tearDown()
-
- def _public_url(self):
- public_port = self.public_server.socket_info['socket'][1]
- return "http://localhost:%s/v2.0" % public_port
-
- def _admin_url(self):
- admin_port = self.admin_server.socket_info['socket'][1]
- return "http://localhost:%s/v2.0" % admin_port
-
- def _client(self, admin=False, **kwargs):
- from keystoneclient.v2_0 import client as ks_client
-
- url = self._admin_url() if admin else self._public_url()
- kc = ks_client.Client(endpoint=url,
- auth_url=self._public_url(),
- **kwargs)
- kc.authenticate()
- # have to manually overwrite the management url after authentication
- kc.management_url = url
- return kc
-
- def get_client(self, user_ref=None, tenant_ref=None, admin=False):
- if user_ref is None:
- user_ref = self.user_foo
- if tenant_ref is None:
- for user in default_fixtures.USERS:
- if user['id'] == user_ref['id']:
- tenant_id = user['tenants'][0]
- else:
- tenant_id = tenant_ref['id']
-
- return self._client(username=user_ref['name'],
- password=user_ref['password'],
- tenant_id=tenant_id,
- admin=admin)
-
-
-class KeystoneClientTests(object):
- """Tests for all versions of keystoneclient."""
-
- def test_authenticate_tenant_name_and_tenants(self):
- client = self.get_client()
- tenants = client.tenants.list()
- self.assertEquals(tenants[0].id, self.tenant_bar['id'])
-
- def test_authenticate_tenant_id_and_tenants(self):
- client = self._client(username=self.user_foo['name'],
- password=self.user_foo['password'],
- tenant_id='bar')
- tenants = client.tenants.list()
- self.assertEquals(tenants[0].id, self.tenant_bar['id'])
-
- def test_authenticate_invalid_tenant_id(self):
- from keystoneclient import exceptions as client_exceptions
- self.assertRaises(client_exceptions.Unauthorized,
- self._client,
- username=self.user_foo['name'],
- password=self.user_foo['password'],
- tenant_id='baz')
-
- def test_authenticate_token_no_tenant(self):
- client = self.get_client()
- token = client.auth_token
- token_client = self._client(token=token)
- tenants = token_client.tenants.list()
- self.assertEquals(tenants[0].id, self.tenant_bar['id'])
-
- def test_authenticate_token_tenant_id(self):
- client = self.get_client()
- token = client.auth_token
- token_client = self._client(token=token, tenant_id='bar')
- tenants = token_client.tenants.list()
- self.assertEquals(tenants[0].id, self.tenant_bar['id'])
-
- def test_authenticate_token_invalid_tenant_id(self):
- from keystoneclient import exceptions as client_exceptions
- client = self.get_client()
- token = client.auth_token
- self.assertRaises(client_exceptions.Unauthorized,
- self._client, token=token,
- tenant_id=uuid.uuid4().hex)
-
- def test_authenticate_token_invalid_tenant_name(self):
- from keystoneclient import exceptions as client_exceptions
- client = self.get_client()
- token = client.auth_token
- self.assertRaises(client_exceptions.Unauthorized,
- self._client, token=token,
- tenant_name=uuid.uuid4().hex)
-
- def test_authenticate_token_tenant_name(self):
- client = self.get_client()
- token = client.auth_token
- token_client = self._client(token=token, tenant_name='BAR')
- tenants = token_client.tenants.list()
- self.assertEquals(tenants[0].id, self.tenant_bar['id'])
- self.assertEquals(tenants[0].id, self.tenant_bar['id'])
-
- def test_authenticate_and_delete_token(self):
- from keystoneclient import exceptions as client_exceptions
-
- client = self.get_client(admin=True)
- token = client.auth_token
- token_client = self._client(token=token)
- tenants = token_client.tenants.list()
- self.assertEquals(tenants[0].id, self.tenant_bar['id'])
-
- client.tokens.delete(token_client.auth_token)
-
- self.assertRaises(client_exceptions.Unauthorized,
- token_client.tenants.list)
-
- def test_authenticate_no_password(self):
- from keystoneclient import exceptions as client_exceptions
-
- user_ref = self.user_foo.copy()
- user_ref['password'] = None
- self.assertRaises(client_exceptions.AuthorizationFailure,
- self.get_client,
- user_ref)
-
- def test_authenticate_no_username(self):
- from keystoneclient import exceptions as client_exceptions
-
- user_ref = self.user_foo.copy()
- user_ref['name'] = None
- self.assertRaises(client_exceptions.AuthorizationFailure,
- self.get_client,
- user_ref)
-
- def test_authenticate_disabled_tenant(self):
- from keystoneclient import exceptions as client_exceptions
-
- admin_client = self.get_client(admin=True)
-
- tenant = {
- 'name': uuid.uuid4().hex,
- 'description': uuid.uuid4().hex,
- 'enabled': False,
- }
- tenant_ref = admin_client.tenants.create(
- tenant_name=tenant['name'],
- description=tenant['description'],
- enabled=tenant['enabled'])
- tenant['id'] = tenant_ref.id
-
- user = {
- 'name': uuid.uuid4().hex,
- 'password': uuid.uuid4().hex,
- 'email': uuid.uuid4().hex,
- 'tenant_id': tenant['id'],
- }
- user_ref = admin_client.users.create(
- name=user['name'],
- password=user['password'],
- email=user['email'],
- tenant_id=user['tenant_id'])
- user['id'] = user_ref.id
-
- # password authentication
- self.assertRaises(
- client_exceptions.Unauthorized,
- self._client,
- username=user['name'],
- password=user['password'],
- tenant_id=tenant['id'])
-
- # token authentication
- client = self._client(
- username=user['name'],
- password=user['password'])
- self.assertRaises(
- client_exceptions.Unauthorized,
- self._client,
- token=client.auth_token,
- tenant_id=tenant['id'])
-
- # FIXME(ja): this test should require the "keystone:admin" roled
- # (probably the role set via --keystone_admin_role flag)
- # FIXME(ja): add a test that admin endpoint is only sent to admin user
- # FIXME(ja): add a test that admin endpoint returns unauthorized if not
- # admin
- def test_tenant_create_update_and_delete(self):
- from keystoneclient import exceptions as client_exceptions
-
- tenant_name = 'original_tenant'
- tenant_description = 'My original tenant!'
- tenant_enabled = True
- client = self.get_client(admin=True)
-
- # create, get, and list a tenant
- tenant = client.tenants.create(tenant_name=tenant_name,
- description=tenant_description,
- enabled=tenant_enabled)
- self.assertEquals(tenant.name, tenant_name)
- self.assertEquals(tenant.description, tenant_description)
- self.assertEquals(tenant.enabled, tenant_enabled)
-
- tenant = client.tenants.get(tenant_id=tenant.id)
- self.assertEquals(tenant.name, tenant_name)
- self.assertEquals(tenant.description, tenant_description)
- self.assertEquals(tenant.enabled, tenant_enabled)
-
- tenant = [t for t in client.tenants.list() if t.id == tenant.id].pop()
- self.assertEquals(tenant.name, tenant_name)
- self.assertEquals(tenant.description, tenant_description)
- self.assertEquals(tenant.enabled, tenant_enabled)
-
- # update, get, and list a tenant
- tenant_name = 'updated_tenant'
- tenant_description = 'Updated tenant!'
- tenant_enabled = False
- tenant = client.tenants.update(tenant_id=tenant.id,
- tenant_name=tenant_name,
- enabled=tenant_enabled,
- description=tenant_description)
- self.assertEquals(tenant.name, tenant_name)
- self.assertEquals(tenant.description, tenant_description)
- self.assertEquals(tenant.enabled, tenant_enabled)
-
- tenant = client.tenants.get(tenant_id=tenant.id)
- self.assertEquals(tenant.name, tenant_name)
- self.assertEquals(tenant.description, tenant_description)
- self.assertEquals(tenant.enabled, tenant_enabled)
-
- tenant = [t for t in client.tenants.list() if t.id == tenant.id].pop()
- self.assertEquals(tenant.name, tenant_name)
- self.assertEquals(tenant.description, tenant_description)
- self.assertEquals(tenant.enabled, tenant_enabled)
-
- # delete, get, and list a tenant
- client.tenants.delete(tenant=tenant.id)
- self.assertRaises(client_exceptions.NotFound, client.tenants.get,
- tenant.id)
- self.assertFalse([t for t in client.tenants.list()
- if t.id == tenant.id])
-
- def test_tenant_create_no_name(self):
- from keystoneclient import exceptions as client_exceptions
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.BadRequest,
- client.tenants.create,
- tenant_name="")
-
- def test_tenant_delete_404(self):
- from keystoneclient import exceptions as client_exceptions
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.tenants.delete,
- tenant=uuid.uuid4().hex)
-
- def test_tenant_get_404(self):
- from keystoneclient import exceptions as client_exceptions
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.tenants.get,
- tenant_id=uuid.uuid4().hex)
-
- def test_tenant_update_404(self):
- from keystoneclient import exceptions as client_exceptions
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.tenants.update,
- tenant_id=uuid.uuid4().hex)
-
- def test_tenant_list(self):
- client = self.get_client()
- tenants = client.tenants.list()
- self.assertEquals(len(tenants), 1)
-
- # Admin endpoint should return *all* tenants
- client = self.get_client(admin=True)
- tenants = client.tenants.list()
- self.assertEquals(len(tenants), len(default_fixtures.TENANTS))
-
- def test_invalid_password(self):
- from keystoneclient import exceptions as client_exceptions
-
- good_client = self._client(username=self.user_foo['name'],
- password=self.user_foo['password'])
- good_client.tenants.list()
-
- self.assertRaises(client_exceptions.Unauthorized,
- self._client,
- username=self.user_foo['name'],
- password=uuid.uuid4().hex)
-
- def test_invalid_user_and_password(self):
- from keystoneclient import exceptions as client_exceptions
-
- self.assertRaises(client_exceptions.Unauthorized,
- self._client,
- username=uuid.uuid4().hex,
- password=uuid.uuid4().hex)
-
- def test_change_password_invalidates_token(self):
- from keystoneclient import exceptions as client_exceptions
-
- client = self.get_client(admin=True)
-
- username = uuid.uuid4().hex
- passwd = uuid.uuid4().hex
- user = client.users.create(name=username, password=passwd,
- email=uuid.uuid4().hex)
-
- token_id = client.tokens.authenticate(username=username,
- password=passwd).id
-
- # authenticate with a token should work before a password change
- client.tokens.authenticate(token=token_id)
-
- client.users.update_password(user=user.id, password=uuid.uuid4().hex)
-
- # authenticate with a token should not work after a password change
- self.assertRaises(client_exceptions.Unauthorized,
- client.tokens.authenticate,
- token=token_id)
-
- def test_disable_user_invalidates_token(self):
- from keystoneclient import exceptions as client_exceptions
-
- admin_client = self.get_client(admin=True)
- foo_client = self.get_client(self.user_foo)
-
- admin_client.users.update_enabled(user=self.user_foo['id'],
- enabled=False)
-
- self.assertRaises(client_exceptions.Unauthorized,
- foo_client.tokens.authenticate,
- token=foo_client.auth_token)
-
- self.assertRaises(client_exceptions.Unauthorized,
- self.get_client,
- self.user_foo)
-
- def test_delete_user_invalidates_token(self):
- from keystoneclient import exceptions as client_exceptions
-
- admin_client = self.get_client(admin=True)
- client = self.get_client(admin=False)
-
- username = uuid.uuid4().hex
- password = uuid.uuid4().hex
- user_id = admin_client.users.create(
- name=username, password=password, email=uuid.uuid4().hex).id
-
- token_id = client.tokens.authenticate(
- username=username, password=password).id
-
- # token should be usable before the user is deleted
- client.tokens.authenticate(token=token_id)
-
- admin_client.users.delete(user=user_id)
-
- # authenticate with a token should not work after the user is deleted
- self.assertRaises(client_exceptions.Unauthorized,
- client.tokens.authenticate,
- token=token_id)
-
- def test_token_expiry_maintained(self):
- timeutils.set_time_override()
- foo_client = self.get_client(self.user_foo)
-
- orig_token = foo_client.service_catalog.catalog['token']
- timeutils.advance_time_seconds(1)
- reauthenticated_token = foo_client.tokens.authenticate(
- token=foo_client.auth_token)
-
- self.assertCloseEnoughForGovernmentWork(
- timeutils.parse_isotime(orig_token['expires']),
- timeutils.parse_isotime(reauthenticated_token.expires))
-
- def test_user_create_update_delete(self):
- from keystoneclient import exceptions as client_exceptions
-
- test_username = 'new_user'
- client = self.get_client(admin=True)
- user = client.users.create(name=test_username,
- password='password',
- email='user1@test.com')
- self.assertEquals(user.name, test_username)
-
- user = client.users.get(user=user.id)
- self.assertEquals(user.name, test_username)
-
- user = client.users.update(user=user,
- name=test_username,
- email='user2@test.com')
- self.assertEquals(user.email, 'user2@test.com')
-
- # NOTE(termie): update_enabled doesn't return anything, probably a bug
- client.users.update_enabled(user=user, enabled=False)
- user = client.users.get(user.id)
- self.assertFalse(user.enabled)
-
- self.assertRaises(client_exceptions.Unauthorized,
- self._client,
- username=test_username,
- password='password')
- client.users.update_enabled(user, True)
-
- user = client.users.update_password(user=user, password='password2')
-
- self._client(username=test_username,
- password='password2')
-
- user = client.users.update_tenant(user=user, tenant='bar')
- # TODO(ja): once keystonelight supports default tenant
- # when you login without specifying tenant, the
- # token should be scoped to tenant 'bar'
-
- client.users.delete(user.id)
- self.assertRaises(client_exceptions.NotFound, client.users.get,
- user.id)
-
- # Test creating a user with a tenant (auto-add to tenant)
- user2 = client.users.create(name=test_username,
- password='password',
- email='user1@test.com',
- tenant_id='bar')
- self.assertEquals(user2.name, test_username)
-
- def test_update_default_tenant_to_existing_value(self):
- client = self.get_client(admin=True)
-
- user = client.users.create(
- name=uuid.uuid4().hex,
- password=uuid.uuid4().hex,
- email=uuid.uuid4().hex,
- tenant_id=self.tenant_bar['id'])
-
- # attempting to update the tenant with the existing value should work
- user = client.users.update_tenant(
- user=user, tenant=self.tenant_bar['id'])
-
- def test_user_create_no_name(self):
- from keystoneclient import exceptions as client_exceptions
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.BadRequest,
- client.users.create,
- name="",
- password=uuid.uuid4().hex,
- email=uuid.uuid4().hex)
-
- def test_user_create_404(self):
- from keystoneclient import exceptions as client_exceptions
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.users.create,
- name=uuid.uuid4().hex,
- password=uuid.uuid4().hex,
- email=uuid.uuid4().hex,
- tenant_id=uuid.uuid4().hex)
-
- def test_user_get_404(self):
- from keystoneclient import exceptions as client_exceptions
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.users.get,
- user=uuid.uuid4().hex)
-
- def test_user_list_404(self):
- from keystoneclient import exceptions as client_exceptions
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.users.list,
- tenant_id=uuid.uuid4().hex)
-
- def test_user_update_404(self):
- from keystoneclient import exceptions as client_exceptions
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.users.update,
- user=uuid.uuid4().hex)
-
- def test_user_update_tenant_404(self):
- self.skipTest('N/A')
- from keystoneclient import exceptions as client_exceptions
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.users.update,
- user=self.user_foo['id'],
- tenant_id=uuid.uuid4().hex)
-
- def test_user_update_password_404(self):
- from keystoneclient import exceptions as client_exceptions
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.users.update_password,
- user=uuid.uuid4().hex,
- password=uuid.uuid4().hex)
-
- def test_user_delete_404(self):
- from keystoneclient import exceptions as client_exceptions
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.users.delete,
- user=uuid.uuid4().hex)
-
- def test_user_list(self):
- client = self.get_client(admin=True)
- users = client.users.list()
- self.assertTrue(len(users) > 0)
- user = users[0]
- self.assertRaises(AttributeError, lambda: user.password)
-
- def test_user_get(self):
- client = self.get_client(admin=True)
- user = client.users.get(user=self.user_foo['id'])
- self.assertRaises(AttributeError, lambda: user.password)
-
- def test_role_get(self):
- client = self.get_client(admin=True)
- role = client.roles.get(role=self.role_admin['id'])
- self.assertEquals(role.id, self.role_admin['id'])
-
- def test_role_crud(self):
- from keystoneclient import exceptions as client_exceptions
-
- test_role = 'new_role'
- client = self.get_client(admin=True)
- role = client.roles.create(name=test_role)
- self.assertEquals(role.name, test_role)
-
- role = client.roles.get(role=role.id)
- self.assertEquals(role.name, test_role)
-
- client.roles.delete(role=role.id)
-
- self.assertRaises(client_exceptions.NotFound,
- client.roles.delete,
- role=role.id)
- self.assertRaises(client_exceptions.NotFound,
- client.roles.get,
- role=role.id)
-
- def test_role_create_no_name(self):
- from keystoneclient import exceptions as client_exceptions
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.BadRequest,
- client.roles.create,
- name="")
-
- def test_role_get_404(self):
- from keystoneclient import exceptions as client_exceptions
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.roles.get,
- role=uuid.uuid4().hex)
-
- def test_role_delete_404(self):
- from keystoneclient import exceptions as client_exceptions
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.roles.delete,
- role=uuid.uuid4().hex)
-
- def test_role_list_404(self):
- from keystoneclient import exceptions as client_exceptions
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.roles.roles_for_user,
- user=uuid.uuid4().hex,
- tenant=uuid.uuid4().hex)
- self.assertRaises(client_exceptions.NotFound,
- client.roles.roles_for_user,
- user=self.user_foo['id'],
- tenant=uuid.uuid4().hex)
- self.assertRaises(client_exceptions.NotFound,
- client.roles.roles_for_user,
- user=uuid.uuid4().hex,
- tenant=self.tenant_bar['id'])
-
- def test_role_list(self):
- client = self.get_client(admin=True)
- roles = client.roles.list()
- # TODO(devcamcar): This assert should be more specific.
- self.assertTrue(len(roles) > 0)
-
- def test_ec2_credential_crud(self):
- client = self.get_client()
- creds = client.ec2.list(user_id=self.user_foo['id'])
- self.assertEquals(creds, [])
-
- cred = client.ec2.create(user_id=self.user_foo['id'],
- tenant_id=self.tenant_bar['id'])
- creds = client.ec2.list(user_id=self.user_foo['id'])
- self.assertEquals(creds, [cred])
-
- got = client.ec2.get(user_id=self.user_foo['id'], access=cred.access)
- self.assertEquals(cred, got)
-
- client.ec2.delete(user_id=self.user_foo['id'], access=cred.access)
- creds = client.ec2.list(user_id=self.user_foo['id'])
- self.assertEquals(creds, [])
-
- def test_ec2_credentials_create_404(self):
- from keystoneclient import exceptions as client_exceptions
- client = self.get_client()
- self.assertRaises(client_exceptions.NotFound,
- client.ec2.create,
- user_id=uuid.uuid4().hex,
- tenant_id=self.tenant_bar['id'])
- self.assertRaises(client_exceptions.NotFound,
- client.ec2.create,
- user_id=self.user_foo['id'],
- tenant_id=uuid.uuid4().hex)
-
- def test_ec2_credentials_delete_404(self):
- from keystoneclient import exceptions as client_exceptions
- client = self.get_client()
- self.assertRaises(client_exceptions.NotFound,
- client.ec2.delete,
- user_id=uuid.uuid4().hex,
- access=uuid.uuid4().hex)
-
- def test_ec2_credentials_get_404(self):
- from keystoneclient import exceptions as client_exceptions
- client = self.get_client()
- self.assertRaises(client_exceptions.NotFound,
- client.ec2.get,
- user_id=uuid.uuid4().hex,
- access=uuid.uuid4().hex)
-
- def test_ec2_credentials_list_404(self):
- from keystoneclient import exceptions as client_exceptions
- client = self.get_client()
- self.assertRaises(client_exceptions.NotFound,
- client.ec2.list,
- user_id=uuid.uuid4().hex)
-
- def test_ec2_credentials_list_user_forbidden(self):
- from keystoneclient import exceptions as client_exceptions
-
- two = self.get_client(self.user_two)
- self.assertRaises(client_exceptions.Forbidden, two.ec2.list,
- user_id=self.user_foo['id'])
-
- def test_ec2_credentials_get_user_forbidden(self):
- from keystoneclient import exceptions as client_exceptions
-
- foo = self.get_client()
- cred = foo.ec2.create(user_id=self.user_foo['id'],
- tenant_id=self.tenant_bar['id'])
-
- two = self.get_client(self.user_two)
- self.assertRaises(client_exceptions.Forbidden, two.ec2.get,
- user_id=self.user_foo['id'], access=cred.access)
-
- foo.ec2.delete(user_id=self.user_foo['id'], access=cred.access)
-
- def test_ec2_credentials_delete_user_forbidden(self):
- from keystoneclient import exceptions as client_exceptions
-
- foo = self.get_client()
- cred = foo.ec2.create(user_id=self.user_foo['id'],
- tenant_id=self.tenant_bar['id'])
-
- two = self.get_client(self.user_two)
- self.assertRaises(client_exceptions.Forbidden, two.ec2.delete,
- user_id=self.user_foo['id'], access=cred.access)
-
- foo.ec2.delete(user_id=self.user_foo['id'], access=cred.access)
-
- def test_service_crud(self):
- from keystoneclient import exceptions as client_exceptions
- client = self.get_client(admin=True)
-
- service_name = uuid.uuid4().hex
- service_type = uuid.uuid4().hex
- service_desc = uuid.uuid4().hex
-
- # create & read
- service = client.services.create(name=service_name,
- service_type=service_type,
- description=service_desc)
- self.assertEquals(service_name, service.name)
- self.assertEquals(service_type, service.type)
- self.assertEquals(service_desc, service.description)
-
- service = client.services.get(id=service.id)
- self.assertEquals(service_name, service.name)
- self.assertEquals(service_type, service.type)
- self.assertEquals(service_desc, service.description)
-
- service = [x for x in client.services.list() if x.id == service.id][0]
- self.assertEquals(service_name, service.name)
- self.assertEquals(service_type, service.type)
- self.assertEquals(service_desc, service.description)
-
- # update is not supported in API v2...
-
- # delete & read
- client.services.delete(id=service.id)
- self.assertRaises(client_exceptions.NotFound,
- client.services.get,
- id=service.id)
- services = [x for x in client.services.list() if x.id == service.id]
- self.assertEquals(len(services), 0)
-
- def test_service_delete_404(self):
- from keystoneclient import exceptions as client_exceptions
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.services.delete,
- id=uuid.uuid4().hex)
-
- def test_service_get_404(self):
- from keystoneclient import exceptions as client_exceptions
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.services.get,
- id=uuid.uuid4().hex)
-
- def test_endpoint_delete_404(self):
- from keystoneclient import exceptions as client_exceptions
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.endpoints.delete,
- id=uuid.uuid4().hex)
-
- def test_admin_requires_adminness(self):
- from keystoneclient import exceptions as client_exceptions
- # FIXME(ja): this should be Unauthorized
- exception = client_exceptions.ClientException
-
- two = self.get_client(self.user_two, admin=True) # non-admin user
-
- # USER CRUD
- self.assertRaises(exception,
- two.users.list)
- self.assertRaises(exception,
- two.users.get,
- user=self.user_two['id'])
- self.assertRaises(exception,
- two.users.create,
- name='oops',
- password='password',
- email='oops@test.com')
- self.assertRaises(exception,
- two.users.delete,
- user=self.user_foo['id'])
-
- # TENANT CRUD
- self.assertRaises(exception,
- two.tenants.list)
- self.assertRaises(exception,
- two.tenants.get,
- tenant_id=self.tenant_bar['id'])
- self.assertRaises(exception,
- two.tenants.create,
- tenant_name='oops',
- description="shouldn't work!",
- enabled=True)
- self.assertRaises(exception,
- two.tenants.delete,
- tenant=self.tenant_baz['id'])
-
- # ROLE CRUD
- self.assertRaises(exception,
- two.roles.get,
- role=self.role_admin['id'])
- self.assertRaises(exception,
- two.roles.list)
- self.assertRaises(exception,
- two.roles.create,
- name='oops')
- self.assertRaises(exception,
- two.roles.delete,
- role=self.role_admin['id'])
-
- # TODO(ja): MEMBERSHIP CRUD
- # TODO(ja): determine what else todo
-
-
-class KcMasterTestCase(CompatTestCase, KeystoneClientTests):
- def get_checkout(self):
- return KEYSTONECLIENT_REPO, 'master'
-
- def test_ec2_auth(self):
- client = self.get_client()
- cred = client.ec2.create(user_id=self.user_foo['id'],
- tenant_id=self.tenant_bar['id'])
-
- from keystoneclient.contrib.ec2 import utils as ec2_utils
- signer = ec2_utils.Ec2Signer(cred.secret)
- credentials = {'params': {'SignatureVersion': '2'},
- 'access': cred.access,
- 'verb': 'GET',
- 'host': 'localhost',
- 'path': '/thisisgoingtowork'}
- signature = signer.generate(credentials)
- credentials['signature'] = signature
- url = '%s/ec2tokens' % (client.auth_url)
- (resp, token) = client.request(url=url,
- method='POST',
- body={'credentials': credentials})
- # make sure we have a v2 token
- self.assertEqual(resp.status_code, 200)
- self.assertIn('access', token)
-
- def test_tenant_add_and_remove_user(self):
- client = self.get_client(admin=True)
- client.roles.add_user_role(tenant=self.tenant_bar['id'],
- user=self.user_two['id'],
- role=self.role_other['id'])
- user_refs = client.tenants.list_users(tenant=self.tenant_bar['id'])
- self.assert_(self.user_two['id'] in [x.id for x in user_refs])
- client.roles.remove_user_role(tenant=self.tenant_bar['id'],
- user=self.user_two['id'],
- role=self.role_other['id'])
- roles = client.roles.roles_for_user(user=self.user_foo['id'],
- tenant=self.tenant_bar['id'])
- self.assertNotIn(self.role_other['id'], roles)
- user_refs = client.tenants.list_users(tenant=self.tenant_bar['id'])
- self.assertNotIn(self.user_two['id'], [x.id for x in user_refs])
-
- def test_user_role_add_404(self):
- from keystoneclient import exceptions as client_exceptions
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.roles.add_user_role,
- tenant=uuid.uuid4().hex,
- user=self.user_foo['id'],
- role=self.role_member['id'])
- self.assertRaises(client_exceptions.NotFound,
- client.roles.add_user_role,
- tenant=self.tenant_baz['id'],
- user=uuid.uuid4().hex,
- role=self.role_member['id'])
- self.assertRaises(client_exceptions.NotFound,
- client.roles.add_user_role,
- tenant=self.tenant_baz['id'],
- user=self.user_foo['id'],
- role=uuid.uuid4().hex)
-
- def test_user_role_remove_404(self):
- from keystoneclient import exceptions as client_exceptions
- client = self.get_client(admin=True)
- self.assertRaises(client_exceptions.NotFound,
- client.roles.remove_user_role,
- tenant=uuid.uuid4().hex,
- user=self.user_foo['id'],
- role=self.role_member['id'])
- self.assertRaises(client_exceptions.NotFound,
- client.roles.remove_user_role,
- tenant=self.tenant_baz['id'],
- user=uuid.uuid4().hex,
- role=self.role_member['id'])
- self.assertRaises(client_exceptions.NotFound,
- client.roles.remove_user_role,
- tenant=self.tenant_baz['id'],
- user=self.user_foo['id'],
- role=uuid.uuid4().hex)
- self.assertRaises(client_exceptions.NotFound,
- client.roles.remove_user_role,
- tenant=self.tenant_baz['id'],
- user=self.user_foo['id'],
- role=self.role_member['id'])
-
- def test_tenant_list_marker(self):
- client = self.get_client()
-
- # Add two arbitrary tenants to user for testing purposes
- for i in range(2):
- tenant_id = uuid.uuid4().hex
- tenant = {'name': 'tenant-%s' % tenant_id, 'id': tenant_id,
- 'domain_id': DEFAULT_DOMAIN_ID}
- self.identity_api.create_project(tenant_id, tenant)
- self.identity_api.add_user_to_project(tenant_id,
- self.user_foo['id'])
-
- tenants = client.tenants.list()
- self.assertEqual(len(tenants), 3)
-
- tenants_marker = client.tenants.list(marker=tenants[0].id)
- self.assertEqual(len(tenants_marker), 2)
- self.assertEqual(tenants[1].name, tenants_marker[0].name)
- self.assertEqual(tenants[2].name, tenants_marker[1].name)
-
- def test_tenant_list_marker_not_found(self):
- from keystoneclient import exceptions as client_exceptions
-
- client = self.get_client()
- self.assertRaises(client_exceptions.BadRequest,
- client.tenants.list, marker=uuid.uuid4().hex)
-
- def test_tenant_list_limit(self):
- client = self.get_client()
-
- # Add two arbitrary tenants to user for testing purposes
- for i in range(2):
- tenant_id = uuid.uuid4().hex
- tenant = {'name': 'tenant-%s' % tenant_id, 'id': tenant_id,
- 'domain_id': DEFAULT_DOMAIN_ID}
- self.identity_api.create_project(tenant_id, tenant)
- self.identity_api.add_user_to_project(tenant_id,
- self.user_foo['id'])
-
- tenants = client.tenants.list()
- self.assertEqual(len(tenants), 3)
-
- tenants_limited = client.tenants.list(limit=2)
- self.assertEqual(len(tenants_limited), 2)
- self.assertEqual(tenants[0].name, tenants_limited[0].name)
- self.assertEqual(tenants[1].name, tenants_limited[1].name)
-
- def test_tenant_list_limit_bad_value(self):
- from keystoneclient import exceptions as client_exceptions
-
- client = self.get_client()
- self.assertRaises(client_exceptions.BadRequest,
- client.tenants.list, limit='a')
- self.assertRaises(client_exceptions.BadRequest,
- client.tenants.list, limit=-1)
-
- def test_roles_get_by_user(self):
- client = self.get_client(admin=True)
- roles = client.roles.roles_for_user(user=self.user_foo['id'],
- tenant=self.tenant_bar['id'])
- self.assertTrue(len(roles) > 0)
-
- def test_user_can_update_passwd(self):
- client = self.get_client(self.user_two)
-
- token_id = client.auth_token
- new_password = uuid.uuid4().hex
-
- # TODO(derekh): Update to use keystoneclient when available
- class FakeResponse(object):
- def start_fake_response(self, status, headers):
- self.response_status = int(status.split(' ', 1)[0])
- self.response_headers = dict(headers)
- responseobject = FakeResponse()
-
- req = webob.Request.blank(
- '/v2.0/OS-KSCRUD/users/%s' % self.user_two['id'],
- headers={'X-Auth-Token': token_id})
- req.method = 'PATCH'
- req.body = ('{"user":{"password":"%s","original_password":"%s"}}' %
- (new_password, self.user_two['password']))
- self.public_server.application(req.environ,
- responseobject.start_fake_response)
-
- self.user_two['password'] = new_password
- self.get_client(self.user_two)
-
- def test_user_cannot_update_other_users_passwd(self):
- from keystoneclient import exceptions as client_exceptions
-
- client = self.get_client(self.user_two)
-
- token_id = client.auth_token
- new_password = uuid.uuid4().hex
-
- # TODO(derekh): Update to use keystoneclient when available
- class FakeResponse(object):
- def start_fake_response(self, status, headers):
- self.response_status = int(status.split(' ', 1)[0])
- self.response_headers = dict(headers)
- responseobject = FakeResponse()
-
- req = webob.Request.blank(
- '/v2.0/OS-KSCRUD/users/%s' % self.user_foo['id'],
- headers={'X-Auth-Token': token_id})
- req.method = 'PATCH'
- req.body = ('{"user":{"password":"%s","original_password":"%s"}}' %
- (new_password, self.user_two['password']))
- self.public_server.application(req.environ,
- responseobject.start_fake_response)
- self.assertEquals(403, responseobject.response_status)
-
- self.user_two['password'] = new_password
- self.assertRaises(client_exceptions.Unauthorized,
- self.get_client, self.user_two)
-
- def test_tokens_after_user_update_passwd(self):
- from keystoneclient import exceptions as client_exceptions
-
- client = self.get_client(self.user_two)
-
- token_id = client.auth_token
- new_password = uuid.uuid4().hex
-
- # TODO(derekh): Update to use keystoneclient when available
- class FakeResponse(object):
- def start_fake_response(self, status, headers):
- self.response_status = int(status.split(' ', 1)[0])
- self.response_headers = dict(headers)
- responseobject = FakeResponse()
-
- req = webob.Request.blank(
- '/v2.0/OS-KSCRUD/users/%s' % self.user_two['id'],
- headers={'X-Auth-Token': token_id})
- req.method = 'PATCH'
- req.body = ('{"user":{"password":"%s","original_password":"%s"}}' %
- (new_password, self.user_two['password']))
-
- rv = self.public_server.application(
- req.environ,
- responseobject.start_fake_response)
- response_json = jsonutils.loads(rv.pop())
- new_token_id = response_json['access']['token']['id']
-
- self.assertRaises(client_exceptions.Unauthorized, client.tenants.list)
- client.auth_token = new_token_id
- client.tenants.list()
-
-
-class KcEssex3TestCase(CompatTestCase, KeystoneClientTests):
- def get_checkout(self):
- return KEYSTONECLIENT_REPO, 'essex-3'
-
- def test_tenant_add_and_remove_user(self):
- client = self.get_client(admin=True)
- client.roles.add_user_to_tenant(tenant_id=self.tenant_bar['id'],
- user_id=self.user_two['id'],
- role_id=self.role_member['id'])
- role_refs = client.roles.get_user_role_refs(
- user_id=self.user_two['id'])
- self.assert_(self.tenant_baz['id'] in [x.tenantId for x in role_refs])
-
- # get the "role_refs" so we get the proper id, this is how the clients
- # do it
- roleref_refs = client.roles.get_user_role_refs(
- user_id=self.user_two['id'])
- for roleref_ref in roleref_refs:
- if (roleref_ref.roleId == self.role_member['id']
- and roleref_ref.tenantId == self.tenant_baz['id']):
- # use python's scope fall through to leave roleref_ref set
- break
-
- client.roles.remove_user_from_tenant(tenant_id=self.tenant_bar['id'],
- user_id=self.user_two['id'],
- role_id=roleref_ref.id)
-
- role_refs = client.roles.get_user_role_refs(
- user_id=self.user_two['id'])
- self.assert_(self.tenant_baz['id'] not in
- [x.tenantId for x in role_refs])
-
- def test_roles_get_by_user(self):
- client = self.get_client(admin=True)
- roles = client.roles.get_user_role_refs(user_id='foo')
- self.assertTrue(len(roles) > 0)
-
- def test_role_list_404(self):
- self.skipTest('N/A')
-
- def test_authenticate_and_delete_token(self):
- self.skipTest('N/A')
-
- def test_user_create_update_delete(self):
- from keystoneclient import exceptions as client_exceptions
-
- test_username = 'new_user'
- client = self.get_client(admin=True)
- user = client.users.create(name=test_username,
- password='password',
- email='user1@test.com')
- self.assertEquals(user.name, test_username)
-
- user = client.users.get(user=user.id)
- self.assertEquals(user.name, test_username)
-
- user = client.users.update_email(user=user, email='user2@test.com')
- self.assertEquals(user.email, 'user2@test.com')
-
- # NOTE(termie): update_enabled doesn't return anything, probably a bug
- client.users.update_enabled(user=user, enabled=False)
- user = client.users.get(user.id)
- self.assertFalse(user.enabled)
-
- self.assertRaises(client_exceptions.Unauthorized,
- self._client,
- username=test_username,
- password='password')
- client.users.update_enabled(user, True)
-
- user = client.users.update_password(user=user, password='password2')
-
- self._client(username=test_username,
- password='password2')
-
- user = client.users.update_tenant(user=user, tenant='bar')
- # TODO(ja): once keystonelight supports default tenant
- # when you login without specifying tenant, the
- # token should be scoped to tenant 'bar'
-
- client.users.delete(user.id)
- self.assertRaises(client_exceptions.NotFound, client.users.get,
- user.id)
-
- def test_user_update_404(self):
- self.skipTest('N/A')
-
- def test_endpoint_create_404(self):
- self.skipTest('N/A')
-
- def test_endpoint_delete_404(self):
- self.skipTest('N/A')
-
- def test_policy_crud(self):
- self.skipTest('N/A due to lack of endpoint CRUD')
-
-
-class Kc11TestCase(CompatTestCase, KeystoneClientTests):
- def get_checkout(self):
- return KEYSTONECLIENT_REPO, '0.1.1'
-
- def test_policy_crud(self):
- self.skipTest('N/A')