summaryrefslogtreecommitdiffstats
path: root/tests/test_v3.py
blob: 28475e14401aef72cef186201e4ab9d1f4f1abcb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import uuid

from keystone.common.sql import util as sql_util
from keystone import test
from keystone import config

import test_content_types


CONF = config.CONF


class RestfulTestCase(test_content_types.RestfulTestCase):
    def setUp(self):
        self.config([
            test.etcdir('keystone.conf.sample'),
            test.testsdir('test_overrides.conf'),
            test.testsdir('backend_sql.conf'),
            test.testsdir('backend_sql_disk.conf')])
        sql_util.setup_test_database()
        self.load_backends()
        self.public_server = self.serveapp('keystone', name='main')
        self.admin_server = self.serveapp('keystone', name='admin')

    def tearDown(self):
        self.public_server.kill()
        self.admin_server.kill()
        self.public_server = None
        self.admin_server = None
        sql_util.teardown_test_database()

    def new_ref(self):
        """Populates a ref with attributes common to all API entities."""
        return {
            'id': uuid.uuid4().hex,
            'name': uuid.uuid4().hex,
            'description': uuid.uuid4().hex,
            'enabled': True}

    def new_service_ref(self):
        ref = self.new_ref()
        ref['type'] = uuid.uuid4().hex
        return ref

    def new_endpoint_ref(self, service_id):
        ref = self.new_ref()
        ref['interface'] = uuid.uuid4().hex[:8]
        ref['service_id'] = service_id
        ref['url'] = uuid.uuid4().hex
        return ref

    def new_domain_ref(self):
        ref = self.new_ref()
        return ref

    def new_project_ref(self, domain_id):
        ref = self.new_ref()
        ref['domain_id'] = domain_id
        return ref

    def new_user_ref(self, domain_id, project_id=None):
        ref = self.new_ref()
        ref['domain_id'] = domain_id
        ref['email'] = uuid.uuid4().hex
        if project_id:
            ref['project_id'] = project_id
        return ref

    def new_group_ref(self, domain_id):
        ref = self.new_ref()
        ref['domain_id'] = domain_id
        return ref

    def new_credential_ref(self, user_id, project_id=None):
        ref = self.new_ref()
        ref['user_id'] = user_id
        ref['blob'] = uuid.uuid4().hex
        ref['type'] = uuid.uuid4().hex
        if project_id:
            ref['project_id'] = project_id
        return ref

    def new_role_ref(self):
        ref = self.new_ref()
        return ref

    def new_policy_ref(self):
        ref = self.new_ref()
        ref['blob'] = uuid.uuid4().hex
        ref['type'] = uuid.uuid4().hex
        return ref

    def get_scoped_token(self):
        """Convenience method so that we can test authenticated requests."""
        # FIXME(dolph): should use real auth
        return 'ADMIN'

        r = self.admin_request(
            method='POST',
            path='/v3/tokens',
            body={
                'auth': {
                    'passwordCredentials': {
                        'username': self.user_foo['name'],
                        'password': self.user_foo['password'],
                    },
                    'tenantId': self.tenant_bar['id'],
                },
            })
        return r.body['access']['token']['id']

    def v3_request(self, path, **kwargs):
        path = '/v3' + path
        return self.admin_request(
            path=path,
            token=self.get_scoped_token(),
            **kwargs)

    def get(self, path, **kwargs):
        return self.v3_request(method='GET', path=path, **kwargs)

    def head(self, path, **kwargs):
        return self.v3_request(method='HEAD', path=path, **kwargs)

    def post(self, path, **kwargs):
        return self.v3_request(method='POST', path=path, **kwargs)

    def put(self, path, **kwargs):
        return self.v3_request(method='PUT', path=path, **kwargs)

    def patch(self, path, **kwargs):
        return self.v3_request(method='PATCH', path=path, **kwargs)

    def delete(self, path, **kwargs):
        return self.v3_request(method='DELETE', path=path, **kwargs)

    def assertValidListResponse(self, resp, key, entity_validator, ref=None,
                                expected_length=None):
        """Make assertions common to all API list responses.

        If a reference is provided, it's ID will be searched for in the
        response, and asserted to be equal.

        """
        entities = resp.body.get(key)
        self.assertIsNotNone(entities)

        if expected_length is not None:
            self.assertEqual(len(entities), expected_length)
        elif ref is not None:
            # we're at least expecting the ref
            self.assertTrue(len(entities))

        # collections should have relational links
        self.assertIsNotNone(resp.body.get('links'))
        self.assertIn('previous', resp.body['links'])
        self.assertIn('self', resp.body['links'])
        self.assertIn('next', resp.body['links'])
        self.assertIn(CONF.public_endpoint % CONF, resp.body['links']['self'])

        for entity in entities:
            self.assertIsNotNone(entity)
            self.assertValidEntity(entity)
            entity_validator(entity)
        if ref:
            entity = [x for x in entities if x['id'] == ref['id']][0]
            self.assertValidEntity(entity, ref)
            entity_validator(entity, ref)
        return entities

    def assertValidResponse(self, resp, key, entity_validator, ref):
        """Make assertions common to all API responses."""
        entity = resp.body.get(key)
        self.assertIsNotNone(entity)
        self.assertValidEntity(entity, ref)
        entity_validator(entity, ref)
        return entity

    def assertValidEntity(self, entity, ref=None):
        """Make assertions common to all API entities.

        If a reference is provided, the entity will also be compared against
        the reference.
        """
        keys = ['name', 'description', 'enabled']

        for k in ['id'] + keys:
            msg = '%s unnexpectedly None in %s' % (k, entity)
            self.assertIsNotNone(entity.get(k), msg)

        self.assertIsNotNone(entity.get('links'))
        self.assertIsNotNone(entity['links'].get('self'))
        self.assertIn(CONF.public_endpoint % CONF, entity['links']['self'])
        self.assertIn(entity['id'], entity['links']['self'])

        if ref:
            for k in keys:
                msg = '%s not equal: %s != %s' % (k, ref[k], entity[k])
                self.assertEquals(ref[k], entity[k])

        return entity


class VersionTestCase(RestfulTestCase):
    def test_get_version(self):
        pass