1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
|
import uuid
from keystone.common.sql import util as sql_util
from keystone import test
from keystone import config
import test_content_types
CONF = config.CONF
class RestfulTestCase(test_content_types.RestfulTestCase):
def setUp(self):
self.config([
test.etcdir('keystone.conf.sample'),
test.testsdir('test_overrides.conf'),
test.testsdir('backend_sql.conf'),
test.testsdir('backend_sql_disk.conf')])
sql_util.setup_test_database()
self.load_backends()
self.public_server = self.serveapp('keystone', name='main')
self.admin_server = self.serveapp('keystone', name='admin')
def tearDown(self):
self.public_server.kill()
self.admin_server.kill()
self.public_server = None
self.admin_server = None
sql_util.teardown_test_database()
def new_ref(self):
"""Populates a ref with attributes common to all API entities."""
return {
'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'description': uuid.uuid4().hex,
'enabled': True}
def new_service_ref(self):
ref = self.new_ref()
ref['type'] = uuid.uuid4().hex
return ref
def new_endpoint_ref(self, service_id):
ref = self.new_ref()
ref['interface'] = uuid.uuid4().hex[:8]
ref['service_id'] = service_id
ref['url'] = uuid.uuid4().hex
return ref
def new_domain_ref(self):
ref = self.new_ref()
return ref
def new_project_ref(self, domain_id):
ref = self.new_ref()
ref['domain_id'] = domain_id
return ref
def new_user_ref(self, domain_id, project_id=None):
ref = self.new_ref()
ref['domain_id'] = domain_id
ref['email'] = uuid.uuid4().hex
if project_id:
ref['project_id'] = project_id
return ref
def new_group_ref(self, domain_id):
ref = self.new_ref()
ref['domain_id'] = domain_id
return ref
def new_credential_ref(self, user_id, project_id=None):
ref = self.new_ref()
ref['user_id'] = user_id
ref['blob'] = uuid.uuid4().hex
ref['type'] = uuid.uuid4().hex
if project_id:
ref['project_id'] = project_id
return ref
def new_role_ref(self):
ref = self.new_ref()
return ref
def new_policy_ref(self):
ref = self.new_ref()
ref['blob'] = uuid.uuid4().hex
ref['type'] = uuid.uuid4().hex
return ref
def get_scoped_token(self):
"""Convenience method so that we can test authenticated requests."""
# FIXME(dolph): should use real auth
return 'ADMIN'
r = self.admin_request(
method='POST',
path='/v3/tokens',
body={
'auth': {
'passwordCredentials': {
'username': self.user_foo['name'],
'password': self.user_foo['password'],
},
'tenantId': self.tenant_bar['id'],
},
})
return r.body['access']['token']['id']
def v3_request(self, path, **kwargs):
path = '/v3' + path
return self.admin_request(
path=path,
token=self.get_scoped_token(),
**kwargs)
def get(self, path, **kwargs):
return self.v3_request(method='GET', path=path, **kwargs)
def head(self, path, **kwargs):
return self.v3_request(method='HEAD', path=path, **kwargs)
def post(self, path, **kwargs):
return self.v3_request(method='POST', path=path, **kwargs)
def put(self, path, **kwargs):
return self.v3_request(method='PUT', path=path, **kwargs)
def patch(self, path, **kwargs):
return self.v3_request(method='PATCH', path=path, **kwargs)
def delete(self, path, **kwargs):
return self.v3_request(method='DELETE', path=path, **kwargs)
def assertValidListResponse(self, resp, key, entity_validator, ref=None,
expected_length=None):
"""Make assertions common to all API list responses.
If a reference is provided, it's ID will be searched for in the
response, and asserted to be equal.
"""
entities = resp.body.get(key)
self.assertIsNotNone(entities)
if expected_length is not None:
self.assertEqual(len(entities), expected_length)
elif ref is not None:
# we're at least expecting the ref
self.assertTrue(len(entities))
# collections should have relational links
self.assertIsNotNone(resp.body.get('links'))
self.assertIn('previous', resp.body['links'])
self.assertIn('self', resp.body['links'])
self.assertIn('next', resp.body['links'])
self.assertIn(CONF.public_endpoint % CONF, resp.body['links']['self'])
for entity in entities:
self.assertIsNotNone(entity)
self.assertValidEntity(entity)
entity_validator(entity)
if ref:
entity = [x for x in entities if x['id'] == ref['id']][0]
self.assertValidEntity(entity, ref)
entity_validator(entity, ref)
return entities
def assertValidResponse(self, resp, key, entity_validator, ref):
"""Make assertions common to all API responses."""
entity = resp.body.get(key)
self.assertIsNotNone(entity)
self.assertValidEntity(entity, ref)
entity_validator(entity, ref)
return entity
def assertValidEntity(self, entity, ref=None):
"""Make assertions common to all API entities.
If a reference is provided, the entity will also be compared against
the reference.
"""
keys = ['name', 'description', 'enabled']
for k in ['id'] + keys:
msg = '%s unnexpectedly None in %s' % (k, entity)
self.assertIsNotNone(entity.get(k), msg)
self.assertIsNotNone(entity.get('links'))
self.assertIsNotNone(entity['links'].get('self'))
self.assertIn(CONF.public_endpoint % CONF, entity['links']['self'])
self.assertIn(entity['id'], entity['links']['self'])
if ref:
for k in keys:
msg = '%s not equal: %s != %s' % (k, ref[k], entity[k])
self.assertEquals(ref[k], entity[k])
return entity
class VersionTestCase(RestfulTestCase):
def test_get_version(self):
pass
|