diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_auth.py | 7 | ||||
-rw-r--r-- | tests/test_backend.py | 44 | ||||
-rw-r--r-- | tests/test_backend_sql.py | 2 | ||||
-rw-r--r-- | tests/test_injection.py | 44 | ||||
-rw-r--r-- | tests/test_keystoneclient.py | 17 | ||||
-rw-r--r-- | tests/test_s3_token_middleware.py | 58 | ||||
-rw-r--r-- | tests/test_sql_migrate_extensions.py | 47 | ||||
-rw-r--r-- | tests/test_sql_upgrade.py | 146 | ||||
-rw-r--r-- | tests/test_token_provider.py | 1 | ||||
-rw-r--r-- | tests/test_v3.py | 3 | ||||
-rw-r--r-- | tests/test_v3_identity.py | 32 | ||||
-rw-r--r-- | tests/test_wsgi.py | 68 |
12 files changed, 322 insertions, 147 deletions
diff --git a/tests/test_auth.py b/tests/test_auth.py index 1416269b..e8e6c7a9 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -179,7 +179,8 @@ class AuthBadRequests(AuthTest): def test_authenticate_password_too_large(self): """Verify sending large 'password' raises the right exception.""" - body_dict = _build_user_auth(username='FOO', password='0' * 8193) + length = CONF.identity.max_password_length + 1 + body_dict = _build_user_auth(username='FOO', password='0' * length) self.assertRaises(exception.ValidationSizeError, self.controller.authenticate, {}, body_dict) @@ -618,6 +619,10 @@ class AuthWithTrust(AuthTest): role_ids = [self.role_browser['id'], self.role_member['id']] self.assertTrue(timeutils.parse_strtime(self.new_trust['expires_at'], fmt=TIME_FORMAT)) + self.assertIn('http://localhost:5000/v3/OS-TRUST/', + self.new_trust['links']['self']) + self.assertIn('http://localhost:5000/v3/OS-TRUST/', + self.new_trust['roles_links']['self']) for role in self.new_trust['roles']: self.assertIn(role['id'], role_ids) diff --git a/tests/test_backend.py b/tests/test_backend.py index a43e92ae..75a94773 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -118,26 +118,25 @@ class IdentityTests(object): self.assertEqual(unicode_name, ref['name']) def test_get_project(self): - tenant_ref = self.identity_api.get_project( - tenant_id=self.tenant_bar['id']) + tenant_ref = self.identity_api.get_project(self.tenant_bar['id']) self.assertDictEqual(tenant_ref, self.tenant_bar) def test_get_project_404(self): self.assertRaises(exception.ProjectNotFound, self.identity_api.get_project, - tenant_id=uuid.uuid4().hex) + uuid.uuid4().hex) def test_get_project_by_name(self): tenant_ref = self.identity_api.get_project_by_name( - tenant_name=self.tenant_bar['name'], - domain_id=DEFAULT_DOMAIN_ID) + self.tenant_bar['name'], + DEFAULT_DOMAIN_ID) self.assertDictEqual(tenant_ref, self.tenant_bar) def test_get_project_by_name_404(self): self.assertRaises(exception.ProjectNotFound, self.identity_api.get_project_by_name, - tenant_name=uuid.uuid4().hex, - domain_id=DEFAULT_DOMAIN_ID) + uuid.uuid4().hex, + DEFAULT_DOMAIN_ID) def test_get_project_users(self): tenant_ref = self.identity_api.get_project_users(self.tenant_baz['id']) @@ -152,10 +151,10 @@ class IdentityTests(object): def test_get_project_users_404(self): self.assertRaises(exception.ProjectNotFound, self.identity_api.get_project_users, - tenant_id=uuid.uuid4().hex) + uuid.uuid4().hex) def test_get_user(self): - user_ref = self.identity_api.get_user(user_id=self.user_foo['id']) + user_ref = self.identity_api.get_user(self.user_foo['id']) # NOTE(termie): the password field is left in user_foo to make # it easier to authenticate in tests, but should # not be returned by the api @@ -165,12 +164,12 @@ class IdentityTests(object): def test_get_user_404(self): self.assertRaises(exception.UserNotFound, self.identity_api.get_user, - user_id=uuid.uuid4().hex) + uuid.uuid4().hex) def test_get_user_by_name(self): user_ref = self.identity_api.get_user_by_name( - user_name=self.user_foo['name'], - domain_id=DEFAULT_DOMAIN_ID) + self.user_foo['name'], DEFAULT_DOMAIN_ID) + # NOTE(termie): the password field is left in user_foo to make # it easier to authenticate in tests, but should # not be returned by the api @@ -180,19 +179,18 @@ class IdentityTests(object): def test_get_user_by_name_404(self): self.assertRaises(exception.UserNotFound, self.identity_api.get_user_by_name, - user_name=uuid.uuid4().hex, - domain_id=DEFAULT_DOMAIN_ID) + uuid.uuid4().hex, + DEFAULT_DOMAIN_ID) def test_get_role(self): - role_ref = self.identity_api.get_role( - role_id=self.role_admin['id']) + role_ref = self.identity_api.get_role(self.role_admin['id']) role_ref_dict = dict((x, role_ref[x]) for x in role_ref) self.assertDictEqual(role_ref_dict, self.role_admin) def test_get_role_404(self): self.assertRaises(exception.RoleNotFound, self.identity_api.get_role, - role_id=uuid.uuid4().hex) + uuid.uuid4().hex) def test_create_duplicate_role_name_fails(self): role = {'id': 'fake1', @@ -1455,6 +1453,18 @@ class IdentityTests(object): tenants = self.identity_api.get_projects_for_user(self.user_foo['id']) self.assertIn(self.tenant_baz['id'], tenants) + def test_add_user_to_project_missing_default_role(self): + self.assignment_api.delete_role(CONF.member_role_id) + self.assertRaises(exception.RoleNotFound, + self.assignment_api.get_role, + CONF.member_role_id) + self.identity_api.add_user_to_project(self.tenant_baz['id'], + self.user_foo['id']) + tenants = self.identity_api.get_projects_for_user(self.user_foo['id']) + self.assertIn(self.tenant_baz['id'], tenants) + default_role = self.assignment_api.get_role(CONF.member_role_id) + self.assertIsNotNone(default_role) + def test_add_user_to_project_404(self): self.assertRaises(exception.ProjectNotFound, self.identity_api.add_user_to_project, diff --git a/tests/test_backend_sql.py b/tests/test_backend_sql.py index 38eddaa4..89276e86 100644 --- a/tests/test_backend_sql.py +++ b/tests/test_backend_sql.py @@ -114,7 +114,7 @@ class SqlModels(SqlTests): def test_role_model(self): cols = (('id', sql.String, 64), - ('name', sql.String, 64)) + ('name', sql.String, 255)) self.assertExpectedSchema('role', cols) def test_user_project_metadata_model(self): diff --git a/tests/test_injection.py b/tests/test_injection.py index 4b6fc8ba..36cd0126 100644 --- a/tests/test_injection.py +++ b/tests/test_injection.py @@ -21,6 +21,10 @@ from keystone.common import dependency class TestDependencyInjection(unittest.TestCase): + def tearDown(self): + dependency.reset() + super(TestDependencyInjection, self).tearDown() + def test_dependency_injection(self): class Interface(object): def do_work(self): @@ -165,3 +169,43 @@ class TestDependencyInjection(unittest.TestCase): with self.assertRaises(dependency.UnresolvableDependencyException): Consumer() + dependency.resolve_future_dependencies() + + def test_circular_dependency(self): + p1_name = uuid.uuid4().hex + p2_name = uuid.uuid4().hex + + @dependency.provider(p1_name) + @dependency.requires(p2_name) + class P1(object): + pass + + @dependency.provider(p2_name) + @dependency.requires(p1_name) + class P2(object): + pass + + p1 = P1() + p2 = P2() + + dependency.resolve_future_dependencies() + + self.assertIs(getattr(p1, p2_name), p2) + self.assertIs(getattr(p2, p1_name), p1) + + def test_reset(self): + # Can reset the registry of providers. + + p_id = uuid.uuid4().hex + + @dependency.provider(p_id) + class P(object): + pass + + p_inst = P() + + self.assertIs(dependency.REGISTRY[p_id], p_inst) + + dependency.reset() + + self.assertFalse(dependency.REGISTRY) diff --git a/tests/test_keystoneclient.py b/tests/test_keystoneclient.py index ecab6a01..38062d4b 100644 --- a/tests/test_keystoneclient.py +++ b/tests/test_keystoneclient.py @@ -18,7 +18,6 @@ import uuid import webob from keystone import test -from keystone import token from keystone import config from keystone.openstack.common import jsonutils @@ -36,15 +35,10 @@ class CompatTestCase(test.TestCase): def setUp(self): super(CompatTestCase, self).setUp() - self.public_server = self.serveapp('keystone', name='main') - self.admin_server = self.serveapp('keystone', name='admin') - - revdir = test.checkout_vendor(*self.get_checkout()) - self.add_path(revdir) - self.clear_module('keystoneclient') + # The backends should be loaded and initialized before the servers are + # started because the servers use the backends. self.load_backends() - self.token_provider_api = token.provider.Manager() self.load_fixtures(default_fixtures) # TODO(termie): add an admin user to the fixtures and use that user @@ -54,6 +48,13 @@ class CompatTestCase(test.TestCase): self.tenant_bar['id'], self.role_admin['id']) + self.public_server = self.serveapp('keystone', name='main') + self.admin_server = self.serveapp('keystone', name='admin') + + revdir = test.checkout_vendor(*self.get_checkout()) + self.add_path(revdir) + self.clear_module('keystoneclient') + def tearDown(self): self.public_server.kill() self.admin_server.kill() diff --git a/tests/test_s3_token_middleware.py b/tests/test_s3_token_middleware.py index c4d56dc9..ec31f2ac 100644 --- a/tests/test_s3_token_middleware.py +++ b/tests/test_s3_token_middleware.py @@ -14,32 +14,13 @@ # License for the specific language governing permissions and limitations # under the License. -import logging - -import stubout import unittest2 as unittest import webob -from swift.common import utils as swift_utils - from keystone.middleware import s3_token from keystone.openstack.common import jsonutils -def setUpModule(self): - self.stubs = stubout.StubOutForTesting() - # Stub out swift_utils.get_logger. get_logger tries to configure - # syslogging to '/dev/log', which will fail on OS X. - - def fake_get_logger(config, log_route=None): - return logging.getLogger(log_route) - self.stubs.Set(swift_utils, 'get_logger', fake_get_logger) - - -def tearDownModule(self): - self.stubs.UnsetAll() - - class FakeHTTPResponse(object): def __init__(self, status, body): self.status = status @@ -211,3 +192,42 @@ class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase): s3_invalid_req = self.middleware.deny_request('InvalidURI') self.assertEqual(resp.body, s3_invalid_req.body) self.assertEqual(resp.status_int, s3_invalid_req.status_int) + + +class S3TokenMiddlewareTestUtil(unittest.TestCase): + def test_split_path_failed(self): + self.assertRaises(ValueError, s3_token.split_path, '') + self.assertRaises(ValueError, s3_token.split_path, '/') + self.assertRaises(ValueError, s3_token.split_path, '//') + self.assertRaises(ValueError, s3_token.split_path, '//a') + self.assertRaises(ValueError, s3_token.split_path, '/a/c') + self.assertRaises(ValueError, s3_token.split_path, '//c') + self.assertRaises(ValueError, s3_token.split_path, '/a/c/') + self.assertRaises(ValueError, s3_token.split_path, '/a//') + self.assertRaises(ValueError, s3_token.split_path, '/a', 2) + self.assertRaises(ValueError, s3_token.split_path, '/a', 2, 3) + self.assertRaises(ValueError, s3_token.split_path, '/a', 2, 3, True) + self.assertRaises(ValueError, s3_token.split_path, '/a/c/o/r', 3, 3) + self.assertRaises(ValueError, s3_token.split_path, '/a', 5, 4) + + def test_split_path_success(self): + self.assertEquals(s3_token.split_path('/a'), ['a']) + self.assertEquals(s3_token.split_path('/a/'), ['a']) + self.assertEquals(s3_token.split_path('/a/c', 2), ['a', 'c']) + self.assertEquals(s3_token.split_path('/a/c/o', 3), ['a', 'c', 'o']) + self.assertEquals(s3_token.split_path('/a/c/o/r', 3, 3, True), + ['a', 'c', 'o/r']) + self.assertEquals(s3_token.split_path('/a/c', 2, 3, True), + ['a', 'c', None]) + self.assertEquals(s3_token.split_path('/a/c/', 2), ['a', 'c']) + self.assertEquals(s3_token.split_path('/a/c/', 2, 3), ['a', 'c', '']) + + def test_split_path_invalid_path(self): + try: + s3_token.split_path('o\nn e', 2) + except ValueError, err: + self.assertEquals(str(err), 'Invalid path: o%0An%20e') + try: + s3_token.split_path('o\nn e', 2, 3, True) + except ValueError, err: + self.assertEquals(str(err), 'Invalid path: o%0An%20e') diff --git a/tests/test_sql_migrate_extensions.py b/tests/test_sql_migrate_extensions.py new file mode 100644 index 00000000..4a529559 --- /dev/null +++ b/tests/test_sql_migrate_extensions.py @@ -0,0 +1,47 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012 OpenStack LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +To run these tests against a live database: +1. Modify the file `tests/backend_sql.conf` to use the connection for your + live database +2. Set up a blank, live database. +3. run the tests using + ./run_tests.sh -N test_sql_upgrade + WARNING:: + Your database will be wiped. + Do not do this against a Database with valuable data as + all data will be lost. +""" + +from keystone.contrib import example + +import test_sql_upgrade + + +class SqlUpgradeExampleExtension(test_sql_upgrade.SqlMigrateBase): + def repo_package(self): + return example + + def test_upgrade(self): + self.assertTableDoesNotExist('example') + self.upgrade(1, repository=self.repo_path) + self.assertTableColumns('example', ['id', 'type', 'extra']) + + def test_downgrade(self): + self.upgrade(1, repository=self.repo_path) + self.assertTableColumns('example', ['id', 'type', 'extra']) + self.downgrade(0, repository=self.repo_path) + self.assertTableDoesNotExist('example') diff --git a/tests/test_sql_upgrade.py b/tests/test_sql_upgrade.py index e6d6162b..9540c4cd 100644 --- a/tests/test_sql_upgrade.py +++ b/tests/test_sql_upgrade.py @@ -45,8 +45,7 @@ CONF = config.CONF DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id -class SqlUpgradeTests(test.TestCase): - +class SqlMigrateBase(test.TestCase): def initialize_sql(self): self.metadata = sqlalchemy.MetaData() self.metadata.bind = self.engine @@ -55,12 +54,15 @@ class SqlUpgradeTests(test.TestCase): test.testsdir('test_overrides.conf'), test.testsdir('backend_sql.conf')] - #override this to sepcify the complete list of configuration files + #override this to specify the complete list of configuration files def config_files(self): return self._config_file_list + def repo_package(self): + return None + def setUp(self): - super(SqlUpgradeTests, self).setUp() + super(SqlMigrateBase, self).setUp() self.config(self.config_files()) self.base = sql.Base() @@ -71,7 +73,7 @@ class SqlUpgradeTests(test.TestCase): autocommit=False) self.initialize_sql() - self.repo_path = migration._find_migrate_repo() + self.repo_path = migration.find_migrate_repo(self.repo_package()) self.schema = versioning_api.ControlledSchema.create( self.engine, self.repo_path, 0) @@ -85,7 +87,64 @@ class SqlUpgradeTests(test.TestCase): autoload=True) self.downgrade(0) table.drop(self.engine, checkfirst=True) - super(SqlUpgradeTests, self).tearDown() + super(SqlMigrateBase, self).tearDown() + + def select_table(self, name): + table = sqlalchemy.Table(name, + self.metadata, + autoload=True) + s = sqlalchemy.select([table]) + return s + + def assertTableExists(self, table_name): + try: + self.select_table(table_name) + except sqlalchemy.exc.NoSuchTableError: + raise AssertionError('Table "%s" does not exist' % table_name) + + def assertTableDoesNotExist(self, table_name): + """Asserts that a given table exists cannot be selected by name.""" + # Switch to a different metadata otherwise you might still + # detect renamed or dropped tables + try: + temp_metadata = sqlalchemy.MetaData() + temp_metadata.bind = self.engine + sqlalchemy.Table(table_name, temp_metadata, autoload=True) + except sqlalchemy.exc.NoSuchTableError: + pass + else: + raise AssertionError('Table "%s" already exists' % table_name) + + def upgrade(self, *args, **kwargs): + self._migrate(*args, **kwargs) + + def downgrade(self, *args, **kwargs): + self._migrate(*args, downgrade=True, **kwargs) + + def _migrate(self, version, repository=None, downgrade=False, + current_schema=None): + repository = repository or self.repo_path + err = '' + version = versioning_api._migrate_version(self.schema, + version, + not downgrade, + err) + if not current_schema: + current_schema = self.schema + changeset = current_schema.changeset(version) + for ver, change in changeset: + self.schema.runchange(ver, change, changeset.step) + self.assertEqual(self.schema.version, version) + + def assertTableColumns(self, table_name, expected_cols): + """Asserts that the table contains the expected set of columns.""" + self.initialize_sql() + table = self.select_table(table_name) + actual_cols = [col.name for col in table.columns] + self.assertEqual(expected_cols, actual_cols, '%s table' % table_name) + + +class SqlUpgradeTests(SqlMigrateBase): def test_blank_db_to_start(self): self.assertTableDoesNotExist('user') @@ -108,13 +167,6 @@ class SqlUpgradeTests(test.TestCase): self.downgrade(x - 1) self.upgrade(x) - def assertTableColumns(self, table_name, expected_cols): - """Asserts that the table contains the expected set of columns.""" - self.initialize_sql() - table = self.select_table(table_name) - actual_cols = [col.name for col in table.columns] - self.assertEqual(expected_cols, actual_cols, '%s table' % table_name) - def test_upgrade_add_initial_tables(self): self.upgrade(1) self.assertTableColumns("user", ["id", "name", "extra"]) @@ -594,6 +646,22 @@ class SqlUpgradeTests(test.TestCase): 'data': json.dumps({"roles": [role2['id']]})} session.execute(metadata_table.insert().values(role_grant)) + # Create another user to test the case where member_role_id is already + # assigned. + user2 = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'domain_id': domain['id'], + 'password': uuid.uuid4().hex, + 'enabled': True, + 'extra': json.dumps({})} + session.execute(user_table.insert().values(user2)) + + # Grant CONF.member_role_id to User2 + role_grant = {'user_id': user2['id'], + 'tenant_id': project['id'], + 'data': json.dumps({"roles": [CONF.member_role_id]})} + session.execute(metadata_table.insert().values(role_grant)) + session.commit() self.upgrade(17) @@ -635,6 +703,14 @@ class SqlUpgradeTests(test.TestCase): 'data': data} cmd = user_project_metadata_table.insert().values(role_grant) self.engine.execute(cmd) + + # Create another conflicting user-project for User2 + data = json.dumps({"roles": [role2['id']]}) + role_grant = {'user_id': user2['id'], + 'project_id': project['id'], + 'data': data} + cmd = user_project_metadata_table.insert().values(role_grant) + self.engine.execute(cmd) # End Scaffolding session.commit() @@ -1278,50 +1354,6 @@ class SqlUpgradeTests(test.TestCase): 'extra': json.dumps(extra)}) self.engine.execute(ins) - def select_table(self, name): - table = sqlalchemy.Table(name, - self.metadata, - autoload=True) - s = sqlalchemy.select([table]) - return s - - def assertTableExists(self, table_name): - try: - self.select_table(table_name) - except sqlalchemy.exc.NoSuchTableError: - raise AssertionError('Table "%s" does not exist' % table_name) - - def assertTableDoesNotExist(self, table_name): - """Asserts that a given table exists cannot be selected by name.""" - # Switch to a different metadata otherwise you might still - # detect renamed or dropped tables - try: - temp_metadata = sqlalchemy.MetaData() - temp_metadata.bind = self.engine - sqlalchemy.Table(table_name, temp_metadata, autoload=True) - except sqlalchemy.exc.NoSuchTableError: - pass - else: - raise AssertionError('Table "%s" already exists' % table_name) - - def upgrade(self, *args, **kwargs): - self._migrate(*args, **kwargs) - - def downgrade(self, *args, **kwargs): - self._migrate(*args, downgrade=True, **kwargs) - - def _migrate(self, version, repository=None, downgrade=False): - repository = repository or self.repo_path - err = '' - version = versioning_api._migrate_version(self.schema, - version, - not downgrade, - err) - changeset = self.schema.changeset(version) - for ver, change in changeset: - self.schema.runchange(ver, change, changeset.step) - self.assertEqual(self.schema.version, version) - def _mysql_check_all_tables_innodb(self): database = self.engine.url.database diff --git a/tests/test_token_provider.py b/tests/test_token_provider.py index ac0b0d6b..a7e92717 100644 --- a/tests/test_token_provider.py +++ b/tests/test_token_provider.py @@ -333,7 +333,6 @@ class TestTokenProvider(test.TestCase): def setUp(self): super(TestTokenProvider, self).setUp() self.load_backends() - self.token_provider_api = token.provider.Manager() def test_get_token_version(self): self.assertEqual( diff --git a/tests/test_v3.py b/tests/test_v3.py index 60a52d69..4f00de7d 100644 --- a/tests/test_v3.py +++ b/tests/test_v3.py @@ -5,7 +5,6 @@ from lxml import etree import webtest from keystone import test -from keystone import token from keystone import auth from keystone.common import serializer @@ -45,8 +44,6 @@ class RestfulTestCase(test_content_types.RestfulTestCase): test.setup_test_database() self.load_backends() - self.token_provider_api = token.provider.Manager() - self.public_app = webtest.TestApp( self.loadapp('keystone', name='main')) self.admin_app = webtest.TestApp( diff --git a/tests/test_v3_identity.py b/tests/test_v3_identity.py index 5eaf9085..f1e19c42 100644 --- a/tests/test_v3_identity.py +++ b/tests/test_v3_identity.py @@ -105,6 +105,10 @@ class IdentityTestCase(test_v3.RestfulTestCase): body={'domain': ref}) return self.assertValidDomainResponse(r, ref) + def test_create_domain_400(self): + """Call ``POST /domains``.""" + self.post('/domains', body={'domain': {}}, expected_status=400) + def test_list_domains(self): """Call ``GET /domains``.""" r = self.get('/domains') @@ -266,19 +270,19 @@ class IdentityTestCase(test_v3.RestfulTestCase): # Check all the domain2 relevant entities are gone self.assertRaises(exception.DomainNotFound, self.identity_api.get_domain, - domain_id=self.domain2['id']) + self.domain2['id']) self.assertRaises(exception.ProjectNotFound, self.identity_api.get_project, - tenant_id=self.project2['id']) + self.project2['id']) self.assertRaises(exception.GroupNotFound, self.identity_api.get_group, - group_id=self.group2['id']) + self.group2['id']) self.assertRaises(exception.UserNotFound, self.identity_api.get_user, - user_id=self.user2['id']) + self.user2['id']) self.assertRaises(exception.CredentialNotFound, self.credential_api.get_credential, - credential_id=self.credential2['id']) + self.credential2['id']) # ...and that all self.domain entities are still here r = self.identity_api.get_domain(self.domain['id']) @@ -313,6 +317,10 @@ class IdentityTestCase(test_v3.RestfulTestCase): body={'project': ref}) self.assertValidProjectResponse(r, ref) + def test_create_project_400(self): + """Call ``POST /projects``.""" + self.post('/projects', body={'project': {}}, expected_status=400) + def test_get_project(self): """Call ``GET /projects/{project_id}``.""" r = self.get( @@ -376,6 +384,10 @@ class IdentityTestCase(test_v3.RestfulTestCase): body={'user': ref}) return self.assertValidUserResponse(r, ref) + def test_create_user_400(self): + """Call ``POST /users``.""" + self.post('/users', body={'user': {}}, expected_status=400) + def test_list_users(self): """Call ``GET /users``.""" r = self.get('/users') @@ -511,7 +523,7 @@ class IdentityTestCase(test_v3.RestfulTestCase): # that reference this project self.assertRaises(exception.CredentialNotFound, self.credential_api.get_credential, - credential_id=self.credential['id']) + self.credential['id']) # And the no tokens we remain valid tokens = self.token_api.list_tokens(self.user['id']) self.assertEquals(len(tokens), 0) @@ -529,6 +541,10 @@ class IdentityTestCase(test_v3.RestfulTestCase): body={'group': ref}) return self.assertValidGroupResponse(r, ref) + def test_create_group_400(self): + """Call ``POST /groups``.""" + self.post('/groups', body={'group': {}}, expected_status=400) + def test_list_groups(self): """Call ``GET /groups``.""" r = self.get('/groups') @@ -569,6 +585,10 @@ class IdentityTestCase(test_v3.RestfulTestCase): body={'role': ref}) return self.assertValidRoleResponse(r, ref) + def test_create_role_400(self): + """Call ``POST /roles``.""" + self.post('/roles', body={'role': {}}, expected_status=400) + def test_list_roles(self): """Call ``GET /roles``.""" r = self.get('/roles') diff --git a/tests/test_wsgi.py b/tests/test_wsgi.py index 369dd952..362df922 100644 --- a/tests/test_wsgi.py +++ b/tests/test_wsgi.py @@ -14,8 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -import webob - from keystone import test from keystone.common import wsgi @@ -34,42 +32,11 @@ class BaseWSGITest(test.TestCase): super(BaseWSGITest, self).setUp() def _make_request(self, url='/'): - req = webob.Request.blank(url) + req = wsgi.Request.blank(url) args = {'action': 'index', 'controller': None} req.environ['wsgiorg.routing_args'] = [None, args] return req - def test_mask_password(self): - message = ("test = 'password': 'aaaaaa', 'param1': 'value1', " - "\"new_password\": 'bbbbbb'") - self.assertEqual(wsgi.mask_password(message, True), - u"test = 'password': '***', 'param1': 'value1', " - "\"new_password\": '***'") - - message = "test = 'password' : 'aaaaaa'" - self.assertEqual(wsgi.mask_password(message, False, '111'), - "test = 'password' : '111'") - - message = u"test = u'password' : u'aaaaaa'" - self.assertEqual(wsgi.mask_password(message, True), - u"test = u'password' : u'***'") - - message = 'test = "password" : "aaaaaaaaa"' - self.assertEqual(wsgi.mask_password(message), - 'test = "password" : "***"') - - message = 'test = "original_password" : "aaaaaaaaa"' - self.assertEqual(wsgi.mask_password(message), - 'test = "original_password" : "***"') - - message = 'test = "original_password" : ""' - self.assertEqual(wsgi.mask_password(message), - 'test = "original_password" : "***"') - - message = 'test = "param1" : "value"' - self.assertEqual(wsgi.mask_password(message), - 'test = "param1" : "value"') - class ApplicationTest(BaseWSGITest): def test_response_content_type(self): @@ -212,3 +179,36 @@ class MiddlewareTest(BaseWSGITest): app = factory(self.app) self.assertIn("testkey", app.kwargs) self.assertEquals("test", app.kwargs["testkey"]) + + +class WSGIFunctionTest(test.TestCase): + def test_mask_password(self): + message = ("test = 'password': 'aaaaaa', 'param1': 'value1', " + "\"new_password\": 'bbbbbb'") + self.assertEqual(wsgi.mask_password(message, True), + u"test = 'password': '***', 'param1': 'value1', " + "\"new_password\": '***'") + + message = "test = 'password' : 'aaaaaa'" + self.assertEqual(wsgi.mask_password(message, False, '111'), + "test = 'password' : '111'") + + message = u"test = u'password' : u'aaaaaa'" + self.assertEqual(wsgi.mask_password(message, True), + u"test = u'password' : u'***'") + + message = 'test = "password" : "aaaaaaaaa"' + self.assertEqual(wsgi.mask_password(message), + 'test = "password" : "***"') + + message = 'test = "original_password" : "aaaaaaaaa"' + self.assertEqual(wsgi.mask_password(message), + 'test = "original_password" : "***"') + + message = 'test = "original_password" : ""' + self.assertEqual(wsgi.mask_password(message), + 'test = "original_password" : "***"') + + message = 'test = "param1" : "value"' + self.assertEqual(wsgi.mask_password(message), + 'test = "param1" : "value"') |