summaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/test_auth.py7
-rw-r--r--tests/test_backend.py44
-rw-r--r--tests/test_backend_sql.py2
-rw-r--r--tests/test_injection.py44
-rw-r--r--tests/test_keystoneclient.py17
-rw-r--r--tests/test_s3_token_middleware.py58
-rw-r--r--tests/test_sql_migrate_extensions.py47
-rw-r--r--tests/test_sql_upgrade.py146
-rw-r--r--tests/test_token_provider.py1
-rw-r--r--tests/test_v3.py3
-rw-r--r--tests/test_v3_identity.py32
-rw-r--r--tests/test_wsgi.py68
12 files changed, 322 insertions, 147 deletions
diff --git a/tests/test_auth.py b/tests/test_auth.py
index 1416269b..e8e6c7a9 100644
--- a/tests/test_auth.py
+++ b/tests/test_auth.py
@@ -179,7 +179,8 @@ class AuthBadRequests(AuthTest):
def test_authenticate_password_too_large(self):
"""Verify sending large 'password' raises the right exception."""
- body_dict = _build_user_auth(username='FOO', password='0' * 8193)
+ length = CONF.identity.max_password_length + 1
+ body_dict = _build_user_auth(username='FOO', password='0' * length)
self.assertRaises(exception.ValidationSizeError,
self.controller.authenticate,
{}, body_dict)
@@ -618,6 +619,10 @@ class AuthWithTrust(AuthTest):
role_ids = [self.role_browser['id'], self.role_member['id']]
self.assertTrue(timeutils.parse_strtime(self.new_trust['expires_at'],
fmt=TIME_FORMAT))
+ self.assertIn('http://localhost:5000/v3/OS-TRUST/',
+ self.new_trust['links']['self'])
+ self.assertIn('http://localhost:5000/v3/OS-TRUST/',
+ self.new_trust['roles_links']['self'])
for role in self.new_trust['roles']:
self.assertIn(role['id'], role_ids)
diff --git a/tests/test_backend.py b/tests/test_backend.py
index a43e92ae..75a94773 100644
--- a/tests/test_backend.py
+++ b/tests/test_backend.py
@@ -118,26 +118,25 @@ class IdentityTests(object):
self.assertEqual(unicode_name, ref['name'])
def test_get_project(self):
- tenant_ref = self.identity_api.get_project(
- tenant_id=self.tenant_bar['id'])
+ tenant_ref = self.identity_api.get_project(self.tenant_bar['id'])
self.assertDictEqual(tenant_ref, self.tenant_bar)
def test_get_project_404(self):
self.assertRaises(exception.ProjectNotFound,
self.identity_api.get_project,
- tenant_id=uuid.uuid4().hex)
+ uuid.uuid4().hex)
def test_get_project_by_name(self):
tenant_ref = self.identity_api.get_project_by_name(
- tenant_name=self.tenant_bar['name'],
- domain_id=DEFAULT_DOMAIN_ID)
+ self.tenant_bar['name'],
+ DEFAULT_DOMAIN_ID)
self.assertDictEqual(tenant_ref, self.tenant_bar)
def test_get_project_by_name_404(self):
self.assertRaises(exception.ProjectNotFound,
self.identity_api.get_project_by_name,
- tenant_name=uuid.uuid4().hex,
- domain_id=DEFAULT_DOMAIN_ID)
+ uuid.uuid4().hex,
+ DEFAULT_DOMAIN_ID)
def test_get_project_users(self):
tenant_ref = self.identity_api.get_project_users(self.tenant_baz['id'])
@@ -152,10 +151,10 @@ class IdentityTests(object):
def test_get_project_users_404(self):
self.assertRaises(exception.ProjectNotFound,
self.identity_api.get_project_users,
- tenant_id=uuid.uuid4().hex)
+ uuid.uuid4().hex)
def test_get_user(self):
- user_ref = self.identity_api.get_user(user_id=self.user_foo['id'])
+ user_ref = self.identity_api.get_user(self.user_foo['id'])
# NOTE(termie): the password field is left in user_foo to make
# it easier to authenticate in tests, but should
# not be returned by the api
@@ -165,12 +164,12 @@ class IdentityTests(object):
def test_get_user_404(self):
self.assertRaises(exception.UserNotFound,
self.identity_api.get_user,
- user_id=uuid.uuid4().hex)
+ uuid.uuid4().hex)
def test_get_user_by_name(self):
user_ref = self.identity_api.get_user_by_name(
- user_name=self.user_foo['name'],
- domain_id=DEFAULT_DOMAIN_ID)
+ self.user_foo['name'], DEFAULT_DOMAIN_ID)
+
# NOTE(termie): the password field is left in user_foo to make
# it easier to authenticate in tests, but should
# not be returned by the api
@@ -180,19 +179,18 @@ class IdentityTests(object):
def test_get_user_by_name_404(self):
self.assertRaises(exception.UserNotFound,
self.identity_api.get_user_by_name,
- user_name=uuid.uuid4().hex,
- domain_id=DEFAULT_DOMAIN_ID)
+ uuid.uuid4().hex,
+ DEFAULT_DOMAIN_ID)
def test_get_role(self):
- role_ref = self.identity_api.get_role(
- role_id=self.role_admin['id'])
+ role_ref = self.identity_api.get_role(self.role_admin['id'])
role_ref_dict = dict((x, role_ref[x]) for x in role_ref)
self.assertDictEqual(role_ref_dict, self.role_admin)
def test_get_role_404(self):
self.assertRaises(exception.RoleNotFound,
self.identity_api.get_role,
- role_id=uuid.uuid4().hex)
+ uuid.uuid4().hex)
def test_create_duplicate_role_name_fails(self):
role = {'id': 'fake1',
@@ -1455,6 +1453,18 @@ class IdentityTests(object):
tenants = self.identity_api.get_projects_for_user(self.user_foo['id'])
self.assertIn(self.tenant_baz['id'], tenants)
+ def test_add_user_to_project_missing_default_role(self):
+ self.assignment_api.delete_role(CONF.member_role_id)
+ self.assertRaises(exception.RoleNotFound,
+ self.assignment_api.get_role,
+ CONF.member_role_id)
+ self.identity_api.add_user_to_project(self.tenant_baz['id'],
+ self.user_foo['id'])
+ tenants = self.identity_api.get_projects_for_user(self.user_foo['id'])
+ self.assertIn(self.tenant_baz['id'], tenants)
+ default_role = self.assignment_api.get_role(CONF.member_role_id)
+ self.assertIsNotNone(default_role)
+
def test_add_user_to_project_404(self):
self.assertRaises(exception.ProjectNotFound,
self.identity_api.add_user_to_project,
diff --git a/tests/test_backend_sql.py b/tests/test_backend_sql.py
index 38eddaa4..89276e86 100644
--- a/tests/test_backend_sql.py
+++ b/tests/test_backend_sql.py
@@ -114,7 +114,7 @@ class SqlModels(SqlTests):
def test_role_model(self):
cols = (('id', sql.String, 64),
- ('name', sql.String, 64))
+ ('name', sql.String, 255))
self.assertExpectedSchema('role', cols)
def test_user_project_metadata_model(self):
diff --git a/tests/test_injection.py b/tests/test_injection.py
index 4b6fc8ba..36cd0126 100644
--- a/tests/test_injection.py
+++ b/tests/test_injection.py
@@ -21,6 +21,10 @@ from keystone.common import dependency
class TestDependencyInjection(unittest.TestCase):
+ def tearDown(self):
+ dependency.reset()
+ super(TestDependencyInjection, self).tearDown()
+
def test_dependency_injection(self):
class Interface(object):
def do_work(self):
@@ -165,3 +169,43 @@ class TestDependencyInjection(unittest.TestCase):
with self.assertRaises(dependency.UnresolvableDependencyException):
Consumer()
+ dependency.resolve_future_dependencies()
+
+ def test_circular_dependency(self):
+ p1_name = uuid.uuid4().hex
+ p2_name = uuid.uuid4().hex
+
+ @dependency.provider(p1_name)
+ @dependency.requires(p2_name)
+ class P1(object):
+ pass
+
+ @dependency.provider(p2_name)
+ @dependency.requires(p1_name)
+ class P2(object):
+ pass
+
+ p1 = P1()
+ p2 = P2()
+
+ dependency.resolve_future_dependencies()
+
+ self.assertIs(getattr(p1, p2_name), p2)
+ self.assertIs(getattr(p2, p1_name), p1)
+
+ def test_reset(self):
+ # Can reset the registry of providers.
+
+ p_id = uuid.uuid4().hex
+
+ @dependency.provider(p_id)
+ class P(object):
+ pass
+
+ p_inst = P()
+
+ self.assertIs(dependency.REGISTRY[p_id], p_inst)
+
+ dependency.reset()
+
+ self.assertFalse(dependency.REGISTRY)
diff --git a/tests/test_keystoneclient.py b/tests/test_keystoneclient.py
index ecab6a01..38062d4b 100644
--- a/tests/test_keystoneclient.py
+++ b/tests/test_keystoneclient.py
@@ -18,7 +18,6 @@ import uuid
import webob
from keystone import test
-from keystone import token
from keystone import config
from keystone.openstack.common import jsonutils
@@ -36,15 +35,10 @@ class CompatTestCase(test.TestCase):
def setUp(self):
super(CompatTestCase, self).setUp()
- self.public_server = self.serveapp('keystone', name='main')
- self.admin_server = self.serveapp('keystone', name='admin')
-
- revdir = test.checkout_vendor(*self.get_checkout())
- self.add_path(revdir)
- self.clear_module('keystoneclient')
+ # The backends should be loaded and initialized before the servers are
+ # started because the servers use the backends.
self.load_backends()
- self.token_provider_api = token.provider.Manager()
self.load_fixtures(default_fixtures)
# TODO(termie): add an admin user to the fixtures and use that user
@@ -54,6 +48,13 @@ class CompatTestCase(test.TestCase):
self.tenant_bar['id'],
self.role_admin['id'])
+ self.public_server = self.serveapp('keystone', name='main')
+ self.admin_server = self.serveapp('keystone', name='admin')
+
+ revdir = test.checkout_vendor(*self.get_checkout())
+ self.add_path(revdir)
+ self.clear_module('keystoneclient')
+
def tearDown(self):
self.public_server.kill()
self.admin_server.kill()
diff --git a/tests/test_s3_token_middleware.py b/tests/test_s3_token_middleware.py
index c4d56dc9..ec31f2ac 100644
--- a/tests/test_s3_token_middleware.py
+++ b/tests/test_s3_token_middleware.py
@@ -14,32 +14,13 @@
# License for the specific language governing permissions and limitations
# under the License.
-import logging
-
-import stubout
import unittest2 as unittest
import webob
-from swift.common import utils as swift_utils
-
from keystone.middleware import s3_token
from keystone.openstack.common import jsonutils
-def setUpModule(self):
- self.stubs = stubout.StubOutForTesting()
- # Stub out swift_utils.get_logger. get_logger tries to configure
- # syslogging to '/dev/log', which will fail on OS X.
-
- def fake_get_logger(config, log_route=None):
- return logging.getLogger(log_route)
- self.stubs.Set(swift_utils, 'get_logger', fake_get_logger)
-
-
-def tearDownModule(self):
- self.stubs.UnsetAll()
-
-
class FakeHTTPResponse(object):
def __init__(self, status, body):
self.status = status
@@ -211,3 +192,42 @@ class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase):
s3_invalid_req = self.middleware.deny_request('InvalidURI')
self.assertEqual(resp.body, s3_invalid_req.body)
self.assertEqual(resp.status_int, s3_invalid_req.status_int)
+
+
+class S3TokenMiddlewareTestUtil(unittest.TestCase):
+ def test_split_path_failed(self):
+ self.assertRaises(ValueError, s3_token.split_path, '')
+ self.assertRaises(ValueError, s3_token.split_path, '/')
+ self.assertRaises(ValueError, s3_token.split_path, '//')
+ self.assertRaises(ValueError, s3_token.split_path, '//a')
+ self.assertRaises(ValueError, s3_token.split_path, '/a/c')
+ self.assertRaises(ValueError, s3_token.split_path, '//c')
+ self.assertRaises(ValueError, s3_token.split_path, '/a/c/')
+ self.assertRaises(ValueError, s3_token.split_path, '/a//')
+ self.assertRaises(ValueError, s3_token.split_path, '/a', 2)
+ self.assertRaises(ValueError, s3_token.split_path, '/a', 2, 3)
+ self.assertRaises(ValueError, s3_token.split_path, '/a', 2, 3, True)
+ self.assertRaises(ValueError, s3_token.split_path, '/a/c/o/r', 3, 3)
+ self.assertRaises(ValueError, s3_token.split_path, '/a', 5, 4)
+
+ def test_split_path_success(self):
+ self.assertEquals(s3_token.split_path('/a'), ['a'])
+ self.assertEquals(s3_token.split_path('/a/'), ['a'])
+ self.assertEquals(s3_token.split_path('/a/c', 2), ['a', 'c'])
+ self.assertEquals(s3_token.split_path('/a/c/o', 3), ['a', 'c', 'o'])
+ self.assertEquals(s3_token.split_path('/a/c/o/r', 3, 3, True),
+ ['a', 'c', 'o/r'])
+ self.assertEquals(s3_token.split_path('/a/c', 2, 3, True),
+ ['a', 'c', None])
+ self.assertEquals(s3_token.split_path('/a/c/', 2), ['a', 'c'])
+ self.assertEquals(s3_token.split_path('/a/c/', 2, 3), ['a', 'c', ''])
+
+ def test_split_path_invalid_path(self):
+ try:
+ s3_token.split_path('o\nn e', 2)
+ except ValueError, err:
+ self.assertEquals(str(err), 'Invalid path: o%0An%20e')
+ try:
+ s3_token.split_path('o\nn e', 2, 3, True)
+ except ValueError, err:
+ self.assertEquals(str(err), 'Invalid path: o%0An%20e')
diff --git a/tests/test_sql_migrate_extensions.py b/tests/test_sql_migrate_extensions.py
new file mode 100644
index 00000000..4a529559
--- /dev/null
+++ b/tests/test_sql_migrate_extensions.py
@@ -0,0 +1,47 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 OpenStack LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+To run these tests against a live database:
+1. Modify the file `tests/backend_sql.conf` to use the connection for your
+ live database
+2. Set up a blank, live database.
+3. run the tests using
+ ./run_tests.sh -N test_sql_upgrade
+ WARNING::
+ Your database will be wiped.
+ Do not do this against a Database with valuable data as
+ all data will be lost.
+"""
+
+from keystone.contrib import example
+
+import test_sql_upgrade
+
+
+class SqlUpgradeExampleExtension(test_sql_upgrade.SqlMigrateBase):
+ def repo_package(self):
+ return example
+
+ def test_upgrade(self):
+ self.assertTableDoesNotExist('example')
+ self.upgrade(1, repository=self.repo_path)
+ self.assertTableColumns('example', ['id', 'type', 'extra'])
+
+ def test_downgrade(self):
+ self.upgrade(1, repository=self.repo_path)
+ self.assertTableColumns('example', ['id', 'type', 'extra'])
+ self.downgrade(0, repository=self.repo_path)
+ self.assertTableDoesNotExist('example')
diff --git a/tests/test_sql_upgrade.py b/tests/test_sql_upgrade.py
index e6d6162b..9540c4cd 100644
--- a/tests/test_sql_upgrade.py
+++ b/tests/test_sql_upgrade.py
@@ -45,8 +45,7 @@ CONF = config.CONF
DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id
-class SqlUpgradeTests(test.TestCase):
-
+class SqlMigrateBase(test.TestCase):
def initialize_sql(self):
self.metadata = sqlalchemy.MetaData()
self.metadata.bind = self.engine
@@ -55,12 +54,15 @@ class SqlUpgradeTests(test.TestCase):
test.testsdir('test_overrides.conf'),
test.testsdir('backend_sql.conf')]
- #override this to sepcify the complete list of configuration files
+ #override this to specify the complete list of configuration files
def config_files(self):
return self._config_file_list
+ def repo_package(self):
+ return None
+
def setUp(self):
- super(SqlUpgradeTests, self).setUp()
+ super(SqlMigrateBase, self).setUp()
self.config(self.config_files())
self.base = sql.Base()
@@ -71,7 +73,7 @@ class SqlUpgradeTests(test.TestCase):
autocommit=False)
self.initialize_sql()
- self.repo_path = migration._find_migrate_repo()
+ self.repo_path = migration.find_migrate_repo(self.repo_package())
self.schema = versioning_api.ControlledSchema.create(
self.engine,
self.repo_path, 0)
@@ -85,7 +87,64 @@ class SqlUpgradeTests(test.TestCase):
autoload=True)
self.downgrade(0)
table.drop(self.engine, checkfirst=True)
- super(SqlUpgradeTests, self).tearDown()
+ super(SqlMigrateBase, self).tearDown()
+
+ def select_table(self, name):
+ table = sqlalchemy.Table(name,
+ self.metadata,
+ autoload=True)
+ s = sqlalchemy.select([table])
+ return s
+
+ def assertTableExists(self, table_name):
+ try:
+ self.select_table(table_name)
+ except sqlalchemy.exc.NoSuchTableError:
+ raise AssertionError('Table "%s" does not exist' % table_name)
+
+ def assertTableDoesNotExist(self, table_name):
+ """Asserts that a given table exists cannot be selected by name."""
+ # Switch to a different metadata otherwise you might still
+ # detect renamed or dropped tables
+ try:
+ temp_metadata = sqlalchemy.MetaData()
+ temp_metadata.bind = self.engine
+ sqlalchemy.Table(table_name, temp_metadata, autoload=True)
+ except sqlalchemy.exc.NoSuchTableError:
+ pass
+ else:
+ raise AssertionError('Table "%s" already exists' % table_name)
+
+ def upgrade(self, *args, **kwargs):
+ self._migrate(*args, **kwargs)
+
+ def downgrade(self, *args, **kwargs):
+ self._migrate(*args, downgrade=True, **kwargs)
+
+ def _migrate(self, version, repository=None, downgrade=False,
+ current_schema=None):
+ repository = repository or self.repo_path
+ err = ''
+ version = versioning_api._migrate_version(self.schema,
+ version,
+ not downgrade,
+ err)
+ if not current_schema:
+ current_schema = self.schema
+ changeset = current_schema.changeset(version)
+ for ver, change in changeset:
+ self.schema.runchange(ver, change, changeset.step)
+ self.assertEqual(self.schema.version, version)
+
+ def assertTableColumns(self, table_name, expected_cols):
+ """Asserts that the table contains the expected set of columns."""
+ self.initialize_sql()
+ table = self.select_table(table_name)
+ actual_cols = [col.name for col in table.columns]
+ self.assertEqual(expected_cols, actual_cols, '%s table' % table_name)
+
+
+class SqlUpgradeTests(SqlMigrateBase):
def test_blank_db_to_start(self):
self.assertTableDoesNotExist('user')
@@ -108,13 +167,6 @@ class SqlUpgradeTests(test.TestCase):
self.downgrade(x - 1)
self.upgrade(x)
- def assertTableColumns(self, table_name, expected_cols):
- """Asserts that the table contains the expected set of columns."""
- self.initialize_sql()
- table = self.select_table(table_name)
- actual_cols = [col.name for col in table.columns]
- self.assertEqual(expected_cols, actual_cols, '%s table' % table_name)
-
def test_upgrade_add_initial_tables(self):
self.upgrade(1)
self.assertTableColumns("user", ["id", "name", "extra"])
@@ -594,6 +646,22 @@ class SqlUpgradeTests(test.TestCase):
'data': json.dumps({"roles": [role2['id']]})}
session.execute(metadata_table.insert().values(role_grant))
+ # Create another user to test the case where member_role_id is already
+ # assigned.
+ user2 = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'domain_id': domain['id'],
+ 'password': uuid.uuid4().hex,
+ 'enabled': True,
+ 'extra': json.dumps({})}
+ session.execute(user_table.insert().values(user2))
+
+ # Grant CONF.member_role_id to User2
+ role_grant = {'user_id': user2['id'],
+ 'tenant_id': project['id'],
+ 'data': json.dumps({"roles": [CONF.member_role_id]})}
+ session.execute(metadata_table.insert().values(role_grant))
+
session.commit()
self.upgrade(17)
@@ -635,6 +703,14 @@ class SqlUpgradeTests(test.TestCase):
'data': data}
cmd = user_project_metadata_table.insert().values(role_grant)
self.engine.execute(cmd)
+
+ # Create another conflicting user-project for User2
+ data = json.dumps({"roles": [role2['id']]})
+ role_grant = {'user_id': user2['id'],
+ 'project_id': project['id'],
+ 'data': data}
+ cmd = user_project_metadata_table.insert().values(role_grant)
+ self.engine.execute(cmd)
# End Scaffolding
session.commit()
@@ -1278,50 +1354,6 @@ class SqlUpgradeTests(test.TestCase):
'extra': json.dumps(extra)})
self.engine.execute(ins)
- def select_table(self, name):
- table = sqlalchemy.Table(name,
- self.metadata,
- autoload=True)
- s = sqlalchemy.select([table])
- return s
-
- def assertTableExists(self, table_name):
- try:
- self.select_table(table_name)
- except sqlalchemy.exc.NoSuchTableError:
- raise AssertionError('Table "%s" does not exist' % table_name)
-
- def assertTableDoesNotExist(self, table_name):
- """Asserts that a given table exists cannot be selected by name."""
- # Switch to a different metadata otherwise you might still
- # detect renamed or dropped tables
- try:
- temp_metadata = sqlalchemy.MetaData()
- temp_metadata.bind = self.engine
- sqlalchemy.Table(table_name, temp_metadata, autoload=True)
- except sqlalchemy.exc.NoSuchTableError:
- pass
- else:
- raise AssertionError('Table "%s" already exists' % table_name)
-
- def upgrade(self, *args, **kwargs):
- self._migrate(*args, **kwargs)
-
- def downgrade(self, *args, **kwargs):
- self._migrate(*args, downgrade=True, **kwargs)
-
- def _migrate(self, version, repository=None, downgrade=False):
- repository = repository or self.repo_path
- err = ''
- version = versioning_api._migrate_version(self.schema,
- version,
- not downgrade,
- err)
- changeset = self.schema.changeset(version)
- for ver, change in changeset:
- self.schema.runchange(ver, change, changeset.step)
- self.assertEqual(self.schema.version, version)
-
def _mysql_check_all_tables_innodb(self):
database = self.engine.url.database
diff --git a/tests/test_token_provider.py b/tests/test_token_provider.py
index ac0b0d6b..a7e92717 100644
--- a/tests/test_token_provider.py
+++ b/tests/test_token_provider.py
@@ -333,7 +333,6 @@ class TestTokenProvider(test.TestCase):
def setUp(self):
super(TestTokenProvider, self).setUp()
self.load_backends()
- self.token_provider_api = token.provider.Manager()
def test_get_token_version(self):
self.assertEqual(
diff --git a/tests/test_v3.py b/tests/test_v3.py
index 60a52d69..4f00de7d 100644
--- a/tests/test_v3.py
+++ b/tests/test_v3.py
@@ -5,7 +5,6 @@ from lxml import etree
import webtest
from keystone import test
-from keystone import token
from keystone import auth
from keystone.common import serializer
@@ -45,8 +44,6 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
test.setup_test_database()
self.load_backends()
- self.token_provider_api = token.provider.Manager()
-
self.public_app = webtest.TestApp(
self.loadapp('keystone', name='main'))
self.admin_app = webtest.TestApp(
diff --git a/tests/test_v3_identity.py b/tests/test_v3_identity.py
index 5eaf9085..f1e19c42 100644
--- a/tests/test_v3_identity.py
+++ b/tests/test_v3_identity.py
@@ -105,6 +105,10 @@ class IdentityTestCase(test_v3.RestfulTestCase):
body={'domain': ref})
return self.assertValidDomainResponse(r, ref)
+ def test_create_domain_400(self):
+ """Call ``POST /domains``."""
+ self.post('/domains', body={'domain': {}}, expected_status=400)
+
def test_list_domains(self):
"""Call ``GET /domains``."""
r = self.get('/domains')
@@ -266,19 +270,19 @@ class IdentityTestCase(test_v3.RestfulTestCase):
# Check all the domain2 relevant entities are gone
self.assertRaises(exception.DomainNotFound,
self.identity_api.get_domain,
- domain_id=self.domain2['id'])
+ self.domain2['id'])
self.assertRaises(exception.ProjectNotFound,
self.identity_api.get_project,
- tenant_id=self.project2['id'])
+ self.project2['id'])
self.assertRaises(exception.GroupNotFound,
self.identity_api.get_group,
- group_id=self.group2['id'])
+ self.group2['id'])
self.assertRaises(exception.UserNotFound,
self.identity_api.get_user,
- user_id=self.user2['id'])
+ self.user2['id'])
self.assertRaises(exception.CredentialNotFound,
self.credential_api.get_credential,
- credential_id=self.credential2['id'])
+ self.credential2['id'])
# ...and that all self.domain entities are still here
r = self.identity_api.get_domain(self.domain['id'])
@@ -313,6 +317,10 @@ class IdentityTestCase(test_v3.RestfulTestCase):
body={'project': ref})
self.assertValidProjectResponse(r, ref)
+ def test_create_project_400(self):
+ """Call ``POST /projects``."""
+ self.post('/projects', body={'project': {}}, expected_status=400)
+
def test_get_project(self):
"""Call ``GET /projects/{project_id}``."""
r = self.get(
@@ -376,6 +384,10 @@ class IdentityTestCase(test_v3.RestfulTestCase):
body={'user': ref})
return self.assertValidUserResponse(r, ref)
+ def test_create_user_400(self):
+ """Call ``POST /users``."""
+ self.post('/users', body={'user': {}}, expected_status=400)
+
def test_list_users(self):
"""Call ``GET /users``."""
r = self.get('/users')
@@ -511,7 +523,7 @@ class IdentityTestCase(test_v3.RestfulTestCase):
# that reference this project
self.assertRaises(exception.CredentialNotFound,
self.credential_api.get_credential,
- credential_id=self.credential['id'])
+ self.credential['id'])
# And the no tokens we remain valid
tokens = self.token_api.list_tokens(self.user['id'])
self.assertEquals(len(tokens), 0)
@@ -529,6 +541,10 @@ class IdentityTestCase(test_v3.RestfulTestCase):
body={'group': ref})
return self.assertValidGroupResponse(r, ref)
+ def test_create_group_400(self):
+ """Call ``POST /groups``."""
+ self.post('/groups', body={'group': {}}, expected_status=400)
+
def test_list_groups(self):
"""Call ``GET /groups``."""
r = self.get('/groups')
@@ -569,6 +585,10 @@ class IdentityTestCase(test_v3.RestfulTestCase):
body={'role': ref})
return self.assertValidRoleResponse(r, ref)
+ def test_create_role_400(self):
+ """Call ``POST /roles``."""
+ self.post('/roles', body={'role': {}}, expected_status=400)
+
def test_list_roles(self):
"""Call ``GET /roles``."""
r = self.get('/roles')
diff --git a/tests/test_wsgi.py b/tests/test_wsgi.py
index 369dd952..362df922 100644
--- a/tests/test_wsgi.py
+++ b/tests/test_wsgi.py
@@ -14,8 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
-import webob
-
from keystone import test
from keystone.common import wsgi
@@ -34,42 +32,11 @@ class BaseWSGITest(test.TestCase):
super(BaseWSGITest, self).setUp()
def _make_request(self, url='/'):
- req = webob.Request.blank(url)
+ req = wsgi.Request.blank(url)
args = {'action': 'index', 'controller': None}
req.environ['wsgiorg.routing_args'] = [None, args]
return req
- def test_mask_password(self):
- message = ("test = 'password': 'aaaaaa', 'param1': 'value1', "
- "\"new_password\": 'bbbbbb'")
- self.assertEqual(wsgi.mask_password(message, True),
- u"test = 'password': '***', 'param1': 'value1', "
- "\"new_password\": '***'")
-
- message = "test = 'password' : 'aaaaaa'"
- self.assertEqual(wsgi.mask_password(message, False, '111'),
- "test = 'password' : '111'")
-
- message = u"test = u'password' : u'aaaaaa'"
- self.assertEqual(wsgi.mask_password(message, True),
- u"test = u'password' : u'***'")
-
- message = 'test = "password" : "aaaaaaaaa"'
- self.assertEqual(wsgi.mask_password(message),
- 'test = "password" : "***"')
-
- message = 'test = "original_password" : "aaaaaaaaa"'
- self.assertEqual(wsgi.mask_password(message),
- 'test = "original_password" : "***"')
-
- message = 'test = "original_password" : ""'
- self.assertEqual(wsgi.mask_password(message),
- 'test = "original_password" : "***"')
-
- message = 'test = "param1" : "value"'
- self.assertEqual(wsgi.mask_password(message),
- 'test = "param1" : "value"')
-
class ApplicationTest(BaseWSGITest):
def test_response_content_type(self):
@@ -212,3 +179,36 @@ class MiddlewareTest(BaseWSGITest):
app = factory(self.app)
self.assertIn("testkey", app.kwargs)
self.assertEquals("test", app.kwargs["testkey"])
+
+
+class WSGIFunctionTest(test.TestCase):
+ def test_mask_password(self):
+ message = ("test = 'password': 'aaaaaa', 'param1': 'value1', "
+ "\"new_password\": 'bbbbbb'")
+ self.assertEqual(wsgi.mask_password(message, True),
+ u"test = 'password': '***', 'param1': 'value1', "
+ "\"new_password\": '***'")
+
+ message = "test = 'password' : 'aaaaaa'"
+ self.assertEqual(wsgi.mask_password(message, False, '111'),
+ "test = 'password' : '111'")
+
+ message = u"test = u'password' : u'aaaaaa'"
+ self.assertEqual(wsgi.mask_password(message, True),
+ u"test = u'password' : u'***'")
+
+ message = 'test = "password" : "aaaaaaaaa"'
+ self.assertEqual(wsgi.mask_password(message),
+ 'test = "password" : "***"')
+
+ message = 'test = "original_password" : "aaaaaaaaa"'
+ self.assertEqual(wsgi.mask_password(message),
+ 'test = "original_password" : "***"')
+
+ message = 'test = "original_password" : ""'
+ self.assertEqual(wsgi.mask_password(message),
+ 'test = "original_password" : "***"')
+
+ message = 'test = "param1" : "value"'
+ self.assertEqual(wsgi.mask_password(message),
+ 'test = "param1" : "value"')