diff options
| author | jabdul <abdulkader.j@hcl.com> | 2011-05-12 20:28:40 +0530 |
|---|---|---|
| committer | jabdul <abdulkader.j@hcl.com> | 2011-05-12 20:28:40 +0530 |
| commit | 921e981a1e18bb45033b7565026f58cb3ea7293e (patch) | |
| tree | 6ed36be16f26b89f827a03cb50e5e9bae365b99d | |
| parent | b96d1716ff0057dcfe4905fef6d598160a23157c (diff) | |
| parent | 1b4fc952f27a292a3f164e42b17d81b1be8bb596 (diff) | |
Merge branch test_users and test_common
Conflicts:
test/unit/test_common.py
test/unit/test_users.py
| -rw-r--r-- | README | 72 | ||||
| -rw-r--r-- | README.md | 113 | ||||
| -rwxr-xr-x | echo/bin/echod | 26 | ||||
| -rw-r--r-- | echo/echo/__init__.py | 18 | ||||
| -rw-r--r-- | echo/echo/server.py (renamed from echo/echo/echo.py) | 5 | ||||
| -rw-r--r-- | keystone/db/sqlalchemy/api.py | 7 | ||||
| -rw-r--r-- | keystone/logic/service.py | 33 | ||||
| -rw-r--r-- | setup.py | 2 | ||||
| -rw-r--r-- | test/unit/test_authentication.py | 64 | ||||
| -rw-r--r-- | test/unit/test_common.py | 184 | ||||
| -rw-r--r-- | test/unit/test_exthandler.py | 49 | ||||
| -rw-r--r-- | test/unit/test_groups.py | 662 | ||||
| -rw-r--r-- | test/unit/test_keystone.py | 24 | ||||
| -rw-r--r-- | test/unit/test_tenant_groups.py | 888 | ||||
| -rw-r--r-- | test/unit/test_tenants.py | 334 | ||||
| -rw-r--r-- | test/unit/test_token.py | 74 | ||||
| -rw-r--r-- | test/unit/test_version.py | 27 |
17 files changed, 1357 insertions, 1225 deletions
@@ -1,3 +1,4 @@ + Keystone: Identity Service ========================== @@ -60,13 +61,22 @@ Or using pip: RUNNING KEYSTONE: ----------------- -From the topdir +From the topdir + + $ bin/keystone-control --pid-file pidfile auth <start|stop|restart> + +config.py takes the config file from topdir/etc/keystone.conf - $ bin/keystone-control --config-file etc/keystone.conf --pid-file=pidfile auth <start|stop|restart> +If the keystone package is also intalled on the system +/etc/keystone.conf or /etc/keystone/keystone.conf has higher priority +than <top_dir>/etc/keystone.conf. If you are also doing development on a +system that has keystone.conf installed in /etc/you need to disambiguate it by -The "start" command invokes bin/keystone-auth. During development you can also run + $ bin/keystone-control --confg-file etc/keystone.conf --pid-file pidfile auth <start|stop|restart> - $ bin/keystone-auth etc/keystone.conf +Also, keystone-control calls keystone-auth and it need to be in the PATH + + $ export PATH=<top_dir>/bin:$PATH @@ -90,7 +100,6 @@ DEMO CLIENT: $ cd echo/echo $ python echo_client.py - INSTALLING KEYSTONE: -------------------- @@ -98,11 +107,6 @@ INSTALLING KEYSTONE: $ sudo python setup.py install -RUNNING KEYSTONE (Eventlet Server): ------------------------------------ - sudo keystone (start|stop|restart) - - INSTALLING TEST SERVICE: ------------------------ @@ -124,16 +128,11 @@ To clean the test database $ sqlite3 keystone/keystone.db < test/kill.sql -To run unit tests: - - $ python test/unit/test_identity.py - To run client demo (with all auth middleware running locally on sample service): $ python echo/echo/echo.py $ python echo/echo/echo_client.py - To perform contract validation and load testing, use SoapUI (for now). Using SOAPUI: @@ -152,45 +151,15 @@ Unit Test on Identity Services In order to run the unit test on identity services start the auth sever $ cd test/unit - $ ../../bin/keystone-control --config-file ../..etc/keystone.conf --pid-file=pidfile auth start + $ ../../bin/keystone-auth -Once the Identity service is running, go to unit test/unit directory +There are 8 groups of tests. They can be run individually or as an entire colection. To run the entire test suite run - $ python test_identity.py + $ python test_keystone -You can run a sbuset of tests the following way - $ grep class test_identity.py - -You get something like - - -class identity_test(unittest.TestCase): -class authorize_test(identity_test): -class validate_token(authorize_test): -class tenant_test(unittest.TestCase): -class create_tenant_test(tenant_test): -class get_tenants_test(tenant_test): -class get_tenant_test(tenant_test): -class update_tenant_test(tenant_test): -class delete_tenant_test(tenant_test): -class tenant_group_test(unittest.TestCase): -class create_tenant_group_test(tenant_group_test): -class get_tenant_groups_test(tenant_group_test): -class get_tenant_group_test(tenant_group_test): -class update_tenant_group_test(tenant_group_test): -class delete_tenant_group_test(tenant_test): -class global_group_test(unittest.TestCase): -class create_global_group_test(global_group_test): -class create_tenant_group_test(tenant_group_test): - -You can choose any class you like to test - - $ python test_identity.py delete_tenant_test - -For more on unit testing please refer - - $ python test_identity --help +A test can also be run individually e.g. + $ python test_token DATABASE SCHEMA @@ -205,6 +174,3 @@ DATABASE SCHEMA - - - diff --git a/README.md b/README.md new file mode 100644 index 00000000..0fb7c64e --- /dev/null +++ b/README.md @@ -0,0 +1,113 @@ +Keystone: Identity Service +========================== + +Keystone is a proposed independent authentication service for [OpenStack](http://www.openstack.org). + +This initial proof of concept aims to address the current use cases in Swift and Nova which are: + +* REST-based, token auth for Swift +* many-to-many relationship between identity and tenant for Nova. + + +SERVICES: +--------- + +* Keystone - authentication service +* Auth_Token - WSGI middleware that can be used to handle token auth protocol (WSGI or remote proxy) +* Echo - A sample service that responds by returning call details + +Also included: + +* Auth_Basic - Stub for WSGI middleware that will be used to handle basic auth +* Auth_OpenID - Stub for WSGI middleware that will be used to handle openid auth protocol +* RemoteAuth - WSGI middleware that can be used in services (like Swift, Nova, and Glance) when Auth middleware is running remotely + + +ENVIRONMENT & DEPENDENCIES: +--------------------------- +see pip-requires for dependency list +Setup: +Install http://pypi.python.org/pypi/setuptools + sudo easy_install pip + sudo pip install -r pip-requires + + +RUNNING KEYSTONE: +----------------- + + $ cd bin + $ ./keystoned + + +RUNNING TEST SERVICE: +--------------------- + + Standalone stack (with Auth_Token) + $ cd echo/bin + $ ./echod + + Distributed stack (with RemoteAuth local and Auth_Token remote) + $ cd echo/bon + $ ./echod --remote + + in separate session + $ cd keystone/auth_protocols + $ python auth_token.py --remote + + +DEMO CLIENT: +--------------------- + $ cd echo/echo + $ python echo_client.py + Note: this requires tests data. See section TESTING for initializing data + + +TESTING +------- + +After starting keystone a keystone.db sqlite database should be created in the keystone folder. + +Add test data to the database: + + $ sqlite3 keystone/keystone.db < test/test_setup.sql + +To clean the test database + + $ sqlite3 keystone/keystone.db < test/kill.sql + +To run unit tests: + + $ python test/unit/test_identity.py + +To run client demo (with all auth middleware running locally on sample service): + + $ ./echo/bin/echod + $ python echo/echo/echo_client.py + + +To perform contract validation and load testing, use SoapUI (for now). + +Using SOAPUI: + +Download [SOAPUI](http://sourceforge.net/projects/soapui/files/): + +To Test Keystone Service: + +* File->Import Project +* Select tests/IdentitySOAPUI.xml +* Double click on "Keystone Tests" and press the green play (>) button + + +Unit Test on Identity Services +------------------------------ +In order to run the unit test on identity services: +* start the keystone server +* cat test_setup.sql |sqlite ../../keystone/keystone.db +* go to unit test/unit directory +* python test_identity.py + +For more on unit testing please refer + + python test_identity --help + + diff --git a/echo/bin/echod b/echo/bin/echod new file mode 100755 index 00000000..d20a1b5e --- /dev/null +++ b/echo/bin/echod @@ -0,0 +1,26 @@ +#!/bin/sh +# Copyright (C) 2011 OpenStack LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# If ../echo/__init__.py exists, add ../ to the Python search path so +# that it will override whatever may be installed in the default Python +# search path. +script_dir=`dirname $0` +if [ -f "$script_dir/../echo/__init__.py" ] +then + PYTHONPATH="$script_dir/..:$PYTHONPATH" + export PYTHONPATH +fi + +/usr/bin/env python -m echo.server $* diff --git a/echo/echo/__init__.py b/echo/echo/__init__.py index 52bcde04..3c39fdbc 100644 --- a/echo/echo/__init__.py +++ b/echo/echo/__init__.py @@ -1 +1,17 @@ -from echo import app_factory +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from server import app_factory diff --git a/echo/echo/echo.py b/echo/echo/server.py index e7fa38d3..8c24aa8f 100644 --- a/echo/echo/echo.py +++ b/echo/echo/server.py @@ -159,5 +159,6 @@ if __name__ == "__main__": app = loadapp("config:" + \ os.path.join(os.path.abspath(os.path.dirname(__file__)), ini), global_conf={"log_name": "echo.log"}) - - wsgi.server(eventlet.listen(('', port)), app) + listener = eventlet.listen(('', port)) + pool = eventlet.GreenPool(1000) + wsgi.server(listener, app, custom_pool=pool) diff --git a/keystone/db/sqlalchemy/api.py b/keystone/db/sqlalchemy/api.py index f1817a24..9fd9a734 100644 --- a/keystone/db/sqlalchemy/api.py +++ b/keystone/db/sqlalchemy/api.py @@ -471,6 +471,13 @@ def token_for_user(user_id, session=None): return result +def token_for_user_tenant(user_id, tenant_id, session=None): + if not session: + session = get_session() + result = session.query(models.Token).filter_by( + user_id=user_id, tenant_id=tenant_id).order_by("expires desc").first() + return result + def user_tenant_create(values): user_tenant_ref = models.UserTenantAssociation() user_tenant_ref.update(values) diff --git a/keystone/logic/service.py b/keystone/logic/service.py index a51e3959..efe2f930 100644 --- a/keystone/logic/service.py +++ b/keystone/logic/service.py @@ -51,17 +51,26 @@ class IDMService(object): # Look for an existing token, or create one, # TODO: Handle tenant/token search # - dtoken = db_api.token_for_user(duser.id) + if not credentials.tenant_id: + dtoken = db_api.token_for_user(duser.id) + else: + dtoken = db_api.token_for_user_tenant(duser.id, credentials.tenant_id) if not dtoken or dtoken.expires < datetime.now(): dtoken = db_models.Token() dtoken.token_id = str(uuid.uuid4()) dtoken.user_id = duser.id + if not duser.tenants: raise fault.IDMFault("Strange: user %s is not associated " "with a tenant!" % duser.id) - dtoken.tenant_id = duser.tenants[0].tenant_id + if not credentials.tenant_id and db_api.user_get_by_tenant(duser.id, credentials.tenant_id): + raise fault.IDMFault("Error: user %s is not associated " + "with a tenant! %s" % (duser.id, + credentials.tenant_id)) + dtoken.tenant_id = credentials.tenant_id + else: + dtoken.tenant_id = duser.tenants[0].tenant_id dtoken.expires = datetime.now() + timedelta(days=1) - db_api.token_create(dtoken) return self.__get_auth_data(dtoken, duser) @@ -81,6 +90,7 @@ class IDMService(object): % user.id) return self.__get_auth_data(token, user) + def revoke_token(self, admin_token, token_id): self.__validate_token(admin_token) @@ -116,6 +126,7 @@ class IDMService(object): return tenant + ## ## GET Tenants with Pagination ## @@ -136,8 +147,6 @@ class IDMService(object): if next: links.append(atom.Link('next', "%s?'marker=%s&limit=%s'" \ % (url, next, limit))) - - return tenants.Tenants(ts, links) def get_tenant(self, admin_token, tenant_id): @@ -146,7 +155,6 @@ class IDMService(object): dtenant = db_api.tenant_get(tenant_id) if not dtenant: raise fault.ItemNotFoundFault("The tenant could not be found") - return tenants.Tenant(dtenant.id, dtenant.desc, dtenant.enabled) def update_tenant(self, admin_token, tenant_id, tenant): @@ -159,11 +167,8 @@ class IDMService(object): dtenant = db_api.tenant_get(tenant_id) if dtenant == None: raise fault.ItemNotFoundFault("The tenant cloud not be found") - values = {'desc': tenant.description, 'enabled': tenant.enabled} - db_api.tenant_update(tenant_id, values) - return tenants.Tenant(dtenant.id, tenant.description, tenant.enabled) def delete_tenant(self, admin_token, tenant_id): @@ -209,9 +214,7 @@ class IDMService(object): dtenant.id = group.group_id dtenant.desc = group.description dtenant.tenant_id = tenant - db_api.tenant_group_create(dtenant) - return tenants.Group(dtenant.id, dtenant.desc, dtenant.tenant_id) def get_tenant_groups(self, admin_token, tenantId, marker, limit, url): @@ -849,7 +852,13 @@ class IDMService(object): if len(duser.tenants) == 0: raise fault.IDMFault("Strange: user %s is not associated " "with a tenant!" % duser.id) - user = auth.User(duser.id, duser.tenants[0].tenant_id, groups) + if not dtoken.tenant_id and \ + db_api.user_get_by_tenant(duser.id, dtoken.tenant_id): + raise fault.IDMFault("Error: user %s is not associated " + "with a tenant! %s" % (duser.id, + dtoken.tenant_id)) + + user = auth.User(duser.id, dtoken.tenant_id, groups) return auth.AuthData(token, user) def __validate_token(self, token_id, admin=True): @@ -16,7 +16,7 @@ from setuptools import setup, find_packages -version = '1.0' +version = '1.0' setup( name='keystone', diff --git a/test/unit/test_authentication.py b/test/unit/test_authentication.py index 43838143..78bf0486 100644 --- a/test/unit/test_authentication.py +++ b/test/unit/test_authentication.py @@ -8,41 +8,41 @@ from webtest import TestApp import httplib2 import json from lxml import etree -import unittest -from webtest import TestApp -from test_common import * + +import test_common as utils class authentication_test(unittest.TestCase): def setUp(self): - self.token = get_token('joeuser', 'secrete', 'token') - self.tenant = get_tenant() - self.user = get_user() - self.userdisabled = get_userdisabled() - self.auth_token = get_auth_token() - self.exp_auth_token = get_exp_auth_token() - self.disabled_token = get_disabled_token() + self.tenant = utils.get_tenant() + self.token = utils.get_token('joeuser', 'secrete', 'token') + self.user = utils.get_user() + self.userdisabled = utils.get_userdisabled() + self.auth_token = utils.get_auth_token() + self.exp_auth_token = utils.get_exp_auth_token() + self.disabled_token = utils.get_disabled_token() def tearDown(self): - delete_token(self.token, self.auth_token) + utils.delete_token(self.token, self.auth_token) def test_a_authorize(self): - resp, content = get_token('joeuser', 'secrete') + resp, content = utils.get_token('joeuser', 'secrete', '') self.assertEqual(200, int(resp['status'])) - self.assertEqual('application/json', content_type(resp)) + self.assertEqual('application/json', utils.content_type(resp)) def test_a_authorize_xml(self): - resp, content = get_token_xml('joeuser', 'secrete') + resp, content = utils.get_token_xml('joeuser', 'secrete', '', + self.tenant) self.assertEqual(200, int(resp['status'])) - self.assertEqual('application/xml', content_type(resp)) + self.assertEqual('application/xml', utils.content_type(resp)) def test_a_authorize_user_disabled(self): - h = httplib2.Http(".cache") - url = '%stoken' % URL + header = httplib2.Http(".cache") + url = '%stoken' % utils.URL body = {"passwordCredentials": {"username": "disabled", "password": "secrete"}} - resp, content = h.request(url, "POST", body=json.dumps(body), + resp, content = header.request(url, "POST", body=json.dumps(body), headers={"Content-Type": "application/json"}) content = json.loads(content) if int(resp['status']) == 500: @@ -50,17 +50,17 @@ class authentication_test(unittest.TestCase): elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(403, int(resp['status'])) - self.assertEqual('application/json', content_type(resp)) + self.assertEqual('application/json', utils.content_type(resp)) def test_a_authorize_user_disabled_xml(self): - h = httplib2.Http(".cache") - url = '%stoken' % URL + header = httplib2.Http(".cache") + url = '%stoken' % utils.URL body = '<?xml version="1.0" encoding="UTF-8"?> \ <passwordCredentials \ xmlns="http://docs.openstack.org/idm/api/v1.0" \ password="secrete" username="disabled" \ />' - resp, content = h.request(url, "POST", body=body, + resp, content = header.request(url, "POST", body=body, headers={"Content-Type": "application/xml", "ACCEPT": "application/xml"}) content = etree.fromstring(content) @@ -69,14 +69,14 @@ class authentication_test(unittest.TestCase): elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(403, int(resp['status'])) - self.assertEqual('application/xml', content_type(resp)) + self.assertEqual('application/xml', utils.content_type(resp)) def test_a_authorize_user_wrong(self): - h = httplib2.Http(".cache") - url = '%stoken' % URL + header = httplib2.Http(".cache") + url = '%stoken' % utils.URL body = {"passwordCredentials": {"username-w": "disabled", "password": "secrete"}} - resp, content = h.request(url, "POST", body=json.dumps(body), + resp, content = header.request(url, "POST", body=json.dumps(body), headers={"Content-Type": "application/json"}) content = json.loads(content) if int(resp['status']) == 500: @@ -84,17 +84,17 @@ class authentication_test(unittest.TestCase): elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(400, int(resp['status'])) - self.assertEqual('application/json', content_type(resp)) + self.assertEqual('application/json', utils.content_type(resp)) def test_a_authorize_user_wrong_xml(self): - h = httplib2.Http(".cache") - url = '%stoken' % URL + header = httplib2.Http(".cache") + url = '%stoken' % utils.URL body = '<?xml version="1.0" encoding="UTF-8"?> \ <passwordCredentials \ xmlns="http://docs.openstack.org/idm/api/v1.0" \ password="secrete" username-w="disabled" \ />' - resp, content = h.request(url, "POST", body=body, + resp, content = header.request(url, "POST", body=body, headers={"Content-Type": "application/xml", "ACCEPT": "application/xml"}) content = etree.fromstring(content) @@ -103,7 +103,7 @@ class authentication_test(unittest.TestCase): elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(400, int(resp['status'])) - self.assertEqual('application/xml', content_type(resp)) + self.assertEqual('application/xml', utils.content_type(resp)) if __name__ == '__main__': - unittest.main()
\ No newline at end of file + unittest.main() diff --git a/test/unit/test_common.py b/test/unit/test_common.py index 3b9197a8..79222674 100644 --- a/test/unit/test_common.py +++ b/test/unit/test_common.py @@ -8,100 +8,105 @@ from webtest import TestApp import httplib2 import json from lxml import etree -import unittest -from webtest import TestApp + URL = 'http://localhost:8080/v1.0/' -def get_token(user, pswd, kind=''): - h = httplib2.Http(".cache") - url = '%stoken' % URL +def get_token(user, pswd, kind='', tenant_id=None): + header = httplib2.Http(".cache") + url = '%stoken' % URL + if not tenant_id: body = {"passwordCredentials": {"username": user, "password": pswd}} - resp, content = h.request(url, "POST", body=json.dumps(body), - headers={"Content-Type": "application/json"}) - content = json.loads(content) - - token = str(content['auth']['token']['id']) - if kind == 'token': - return token - else: - return (resp, content) + else: + body = {"passwordCredentials": {"username": user, + "password": pswd, + "tenantId": tenant_id}} + + resp, content = header.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json"}) + content = json.loads(content) + token = str(content['auth']['token']['id']) + if kind == 'token': + return token + else: + return (resp, content) def delete_token(token, auth_token): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") url = '%stoken/%s' % (URL, token) - resp, content = h.request(url, "DELETE", body='', + resp, content = header.request(url, "DELETE", body='', headers={"Content-Type": "application/json", "X-Auth-Token": auth_token}) return (resp, content) def create_tenant(tenantid, auth_token): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") url = '%stenants' % (URL) body = {"tenant": {"id": tenantid, "description": "A description ...", "enabled": True}} - resp, content = h.request(url, "POST", body=json.dumps(body), + resp, content = header.request(url, "POST", body=json.dumps(body), headers={"Content-Type": "application/json", "X-Auth-Token": auth_token}) return (resp, content) def create_tenant_group(groupid, tenantid, auth_token): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") url = '%stenant/%s/groups' % (URL, tenantid) body = {"group": {"id": groupid, "description": "A description ..."}} - resp, content = h.request(url, "POST", body=json.dumps(body), + resp, content = header.request(url, "POST", body=json.dumps(body), headers={"Content-Type": "application/json", "X-Auth-Token": auth_token}) return (resp, content) def delete_tenant(tenantid, auth_token): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") url = '%stenants/%s' % (URL, tenantid) - resp, content = h.request(url, "DELETE", body='{}', + resp, content = header.request(url, "DELETE", body='{}', headers={"Content-Type": "application/json", "X-Auth-Token": auth_token}) return (resp, content) def delete_tenant_group(groupid, tenantid, auth_token): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") url = '%stenant/%s/groups/%s' % (URL, tenantid, groupid) - resp, content = h.request(url, "DELETE", body='{}', + resp, content = header.request(url, "DELETE", body='{}', headers={"Content-Type": "application/json", "X-Auth-Token": auth_token}) return (resp, content) def create_global_group(groupid, auth_token): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") url = '%sgroups' % (URL) body = {"group": {"id": groupid, - "description": "A description ..."}} - resp, content = h.request(url, "POST", body=json.dumps(body), + "description": "A description ..."}} + resp, content = header.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json", "X-Auth-Token": auth_token}) return (resp, content) def create_global_group_xml(groupid, auth_token): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") url = '%sgroups' % (URL) body = '<?xml version="1.0" encoding="UTF-8"?>\ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ id="%s"><description>A Description of the group</description>\ </group>' % groupid - resp, content = h.request(url, "POST", body=body, + resp, content = header.request(url, "POST", body=body, headers={"Content-Type": "application/xml", "X-Auth-Token": auth_token, "ACCEPT": "application/xml"}) @@ -109,33 +114,39 @@ def create_global_group_xml(groupid, auth_token): def delete_global_group(groupid, auth_token): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") url = '%sgroups/%s' % (URL, groupid) - resp, content = h.request(url, "DELETE", body='{}', + resp, content = header.request(url, "DELETE", body='{}', headers={"Content-Type": "application/json", "X-Auth-Token": auth_token}) return (resp, content) def delete_global_group_xml(groupid, auth_token): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") url = '%sgroups/%s' % (URL, groupid) - resp, content = h.request(url, "DELETE", body='', + resp, content = header.request(url, "DELETE", body='', headers={"Content-Type": "application/xml", "X-Auth-Token": auth_token, "ACCEPT": "application/xml"}) return (resp, content) -def get_token_xml(user, pswd, type=''): - h = httplib2.Http(".cache") +def get_token_xml(user, pswd, type='', tenant_id=None): + header = httplib2.Http(".cache") url = '%stoken' % URL - body = '<?xml version="1.0" encoding="UTF-8"?> \ - <passwordCredentials \ - xmlns="http://docs.openstack.org/idm/api/v1.0" \ - password="%s" username="%s" \ - tenantId="77654"/> ' % (pswd, user) - resp, content = h.request(url, "POST", body=body, + if tenant_id: + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <passwordCredentials \ + xmlns="http://docs.openstack.org/idm/api/v1.0" \ + password="%s" username="%s" \ + tenantId="%s"/> ' % (pswd, user, tenant_id) + else: + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <passwordCredentials \ + xmlns="http://docs.openstack.org/idm/api/v1.0" \ + password="%s" username="%s" /> ' % (pswd, user) + resp, content = header.request(url, "POST", body=body, headers={"Content-Type": "application/xml", "ACCEPT": "application/xml"}) dom = etree.fromstring(content) @@ -149,9 +160,9 @@ def get_token_xml(user, pswd, type=''): def delete_token_xml(token, auth_token): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") url = '%stoken/%s' % (URL, token) - resp, content = h.request(url, "DELETE", body='', + resp, content = header.request(url, "DELETE", body='', headers={"Content-Type": "application/xml", "X-Auth-Token": auth_token, "ACCEPT": "application/xml"}) @@ -159,14 +170,14 @@ def delete_token_xml(token, auth_token): def create_tenant_xml(tenantid, auth_token): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") url = '%stenants' % (URL) body = '<?xml version="1.0" encoding="UTF-8"?> \ <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \ enabled="true" id="%s"> \ <description>A description...</description> \ </tenant>' % tenantid - resp, content = h.request(url, "POST", body=body, + resp, content = header.request(url, "POST", body=body, headers={"Content-Type": "application/xml", "X-Auth-Token": auth_token, "ACCEPT": "application/xml"}) @@ -174,14 +185,14 @@ def create_tenant_xml(tenantid, auth_token): def create_tenant_group_xml(groupid, tenantid, auth_token): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") url = '%stenant/%s/groups' % (URL, tenantid) body = '<?xml version="1.0" encoding="UTF-8"?> \ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ id="%s"> \ <description>A description...</description> \ </group>' % groupid - resp, content = h.request(url, "POST", body=body, + resp, content = header.request(url, "POST", body=body, headers={"Content-Type": "application/xml", "X-Auth-Token": auth_token, "ACCEPT": "application/xml"}) @@ -189,9 +200,9 @@ def create_tenant_group_xml(groupid, tenantid, auth_token): def delete_tenant_xml(tenantid, auth_token): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") url = '%stenants/%s' % (URL, tenantid) - resp, content = h.request(url, "DELETE", body='', + resp, content = header.request(url, "DELETE", body='', headers={"Content-Type": "application/xml", "X-Auth-Token": auth_token, "ACCEPT": "application/xml"}) @@ -199,9 +210,9 @@ def delete_tenant_xml(tenantid, auth_token): def delete_tenant_group_xml(groupid, tenantid, auth_token): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") url = '%stenant/%s/groups/%s' % (URL, tenantid, groupid) - resp, content = h.request(url, "DELETE", body='', + resp, content = header.request(url, "DELETE", body='', headers={"Content-Type": "application/xml", "X-Auth-Token": auth_token, "ACCEPT": "application/xml"}) @@ -209,28 +220,39 @@ def delete_tenant_group_xml(groupid, tenantid, auth_token): def create_user(tenantid, userid, auth_token): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") url = '%stenants/%s/users' % (URL, tenantid) body = {"user": {"password": "secrete", "id": userid, "tenantId": tenantid, "email": "%s@rackspace.com" % userid, "enabled": True}} - resp, content = h.request(url, "POST", body=json.dumps(body), + resp, content = header.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json", + "X-Auth-Token": auth_token}) + return (resp, content) + + +def delete_user(tenant, userid, auth_token): + header = httplib2.Http(".cache") + url = '%stenants/%s/users/%s' % (URL, tenant, userid) + + resp, content = header.request(url, "DELETE", body='{}', headers={"Content-Type": "application/json", "X-Auth-Token": auth_token}) + return (resp, content) def create_user_xml(tenantid, userid, auth_token): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") url = '%stenants/%s/users' % (URL, tenantid) body = '<?xml version="1.0" encoding="UTF-8"?> \ <user xmlns="http://docs.openstack.org/idm/api/v1.0" \ email="joetest@rackspace.com" \ tenantId="%s" id="%s" \ enabled="true" password="secrete"/>' % (tenantid, userid) - resp, content = h.request(url, "POST", body=body, + resp, content = header.request(url, "POST", body=body, headers={"Content-Type": "application/xml", "X-Auth-Token": auth_token, "ACCEPT": "application/xml"}) @@ -248,9 +270,9 @@ def delete_user(tenant, userid, auth_token): def delete_user_xml(tenantid, userid, auth_token): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") url = '%stenants/%s/users/%s' % (URL, tenantid, userid) - resp, content = h.request(url, "DELETE", body='', + resp, content = header.request(url, "DELETE", body='', headers={"Content-Type": "application/xml", "X-Auth-Token": auth_token, "ACCEPT": "application/xml"}) @@ -393,20 +415,20 @@ def users_group_get_xml(tenant_id, user_id, auth_token): def add_user_tenant_group(tenantid, groupid, userid, auth_token): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") url = '%stenants/%s/groups/%s/users/%s' % (URL, tenantid, groupid, userid) - resp, content = h.request(url, "PUT", body='', + resp, content = header.request(url, "PUT", body='', headers={"Content-Type": "application/json", "X-Auth-Token": auth_token}) return (resp, content) def add_user_tenant_group_xml(tenantid, groupid, userid, auth_token): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") url = '%stenants/%s/groups/%s/users/%s' % (URL, tenantid, groupid, userid) - resp, content = h.request(url, "PUT", body='', + resp, content = header.request(url, "PUT", body='', headers={"Content-Type": "application/xml", "X-Auth-Token": auth_token, "ACCEPT": "application/xml"}) @@ -414,20 +436,20 @@ def add_user_tenant_group_xml(tenantid, groupid, userid, auth_token): def delete_user_tenant_group(tenantid, groupid, userid, auth_token): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") url = '%stenants/%s/groups/%s/users/%s' % (URL, tenantid, groupid, userid) - resp, content = h.request(url, "DELETE", body='', + resp, content = header.request(url, "DELETE", body='', headers={"Content-Type": "application/json", "X-Auth-Token": auth_token}) return (resp, content) def delete_user_tenant_group_xml(tenantid, groupid, userid, auth_token): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") url = '%stenants/%s/groups/%s/users/%s' % (URL, tenantid, groupid, userid) - resp, content = h.request(url, "DELETE", body='', + resp, content = header.request(url, "DELETE", body='', headers={"Content-Type": "application/xml", "X-Auth-Token": auth_token, "ACCEPT": "application/xml"}) @@ -435,20 +457,20 @@ def delete_user_tenant_group_xml(tenantid, groupid, userid, auth_token): def get_user_tenant_group(tenantid, groupid, auth_token): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") url = '%stenants/%s/groups/%s/users' % (URL, tenantid, groupid) - resp, content = h.request(url, "GET", body='', + resp, content = header.request(url, "GET", body='', headers={"Content-Type": "application/json", "X-Auth-Token": auth_token}) return (resp, content) def get_user_tenant_group_xml(tenantid, groupid, auth_token): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") url = '%stenants/%s/groups/%s/users' % (URL, tenantid, groupid) - resp, content = h.request(url, "GET", body='', + resp, content = header.request(url, "GET", body='', headers={"Content-Type": "application/xml", "X-Auth-Token": auth_token, "ACCEPT": "application/xml"}) @@ -456,20 +478,20 @@ def get_user_tenant_group_xml(tenantid, groupid, auth_token): def add_user_global_group(groupid, userid, auth_token): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") url = '%sgroups/%s/users/%s' % (URL, groupid, userid) - resp, content = h.request(url, "PUT", body='', + resp, content = header.request(url, "PUT", body='', headers={"Content-Type": "application/json", "X-Auth-Token": auth_token}) return (resp, content) def add_user_global_group_xml(groupid, userid, auth_token): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") url = '%sgroups/%s/users/%s' % (URL, groupid, userid) - resp, content = h.request(url, "PUT", body='', + resp, content = header.request(url, "PUT", body='', headers={"Content-Type": "application/xml", "X-Auth-Token": auth_token, "ACCEPT": "application/xml"}) @@ -477,20 +499,20 @@ def add_user_global_group_xml(groupid, userid, auth_token): def delete_user_global_group(groupid, userid, auth_token): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") url = '%sgroups/%s/users/%s' % (URL, groupid, userid) - resp, content = h.request(url, "DELETE", body='', + resp, content = header.request(url, "DELETE", body='', headers={"Content-Type": "application/json", "X-Auth-Token": auth_token}) return (resp, content) def delete_user_global_group_xml(groupid, userid, auth_token): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") url = '%sgroups/%s/users/%s' % (URL, groupid, userid) - resp, content = h.request(url, "DELETE", body='', + resp, content = header.request(url, "DELETE", body='', headers={"Content-Type": "application/xml", "X-Auth-Token": auth_token, "ACCEPT": "application/xml"}) @@ -498,10 +520,10 @@ def delete_user_global_group_xml(groupid, userid, auth_token): def get_user_global_group(groupid, auth_token): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") url = '%sgroups/%s/users' % (URL, groupid) - resp, content = h.request(url, "GET", body='', + resp, content = header.request(url, "GET", body='', headers={"Content-Type": "application/json", "X-Auth-Token": auth_token}) @@ -521,10 +543,10 @@ def get_email(): def get_user_global_group_xml(groupid, auth_token): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") url = '%sgroups/%s/users' % (URL, groupid) - resp, content = h.request(url, "GET", body='', + resp, content = header.request(url, "GET", body='', headers={"Content-Type": "application/xml", "X-Auth-Token": auth_token, "ACCEPT": "application/xml"}) diff --git a/test/unit/test_exthandler.py b/test/unit/test_exthandler.py new file mode 100644 index 00000000..4d09c148 --- /dev/null +++ b/test/unit/test_exthandler.py @@ -0,0 +1,49 @@ +import os +import sys +# Need to access identity module +sys.path.append(os.path.abspath(os.path.join( + os.getcwd(), '..', '..', 'keystone'))) +from queryext.exthandler import UrlExtensionFilter +import unittest + + +class MockWsgiApp(object): + + def __init__(self): + pass + + def __call__(self, env, start_response): + pass + + +def _start_response(): + pass + + +class UrlExtensionFilterTest(unittest.TestCase): + + def setUp(self): + self.filter = UrlExtensionFilter(MockWsgiApp(), {}) + + def test_xml_extension(self): + env = {'PATH_INFO': '/v1.0/someresource.xml'} + self.filter(env, _start_response) + self.assertEqual('/v1.0/someresource', env['PATH_INFO']) + self.assertEqual('application/xml', env['HTTP_ACCEPT']) + + def test_json_extension(self): + env = {'PATH_INFO': '/v1.0/someresource.json'} + self.filter(env, _start_response) + self.assertEqual('/v1.0/someresource', env['PATH_INFO']) + self.assertEqual('application/json', env['HTTP_ACCEPT']) + + def test_extension_overrides_header(self): + env = {'PATH_INFO': '/v1.0/someresource.json', + 'HTTP_ACCEPT': 'application/xml'} + self.filter(env, _start_response) + self.assertEqual('/v1.0/someresource', env['PATH_INFO']) + self.assertEqual('application/json', env['HTTP_ACCEPT']) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/unit/test_groups.py b/test/unit/test_groups.py index 1e6e4be1..1ca4dff3 100644 --- a/test/unit/test_groups.py +++ b/test/unit/test_groups.py @@ -8,39 +8,40 @@ from webtest import TestApp import httplib2 import json from lxml import etree -import unittest -from webtest import TestApp -from test_common import * + +import test_common as utils ## ## Global Group Tests ## + + class global_group_test(unittest.TestCase): def setUp(self): - self.token = get_token('joeuser', 'secrete', 'token') - self.globaltenant = get_global_tenant() - self.user = get_user() - self.userdisabled = get_userdisabled() - self.auth_token = get_auth_token() - self.exp_auth_token = get_exp_auth_token() - self.disabled_token = get_disabled_token() + self.token = utils.get_token('joeuser', 'secrete', 'token') + self.globaltenant = utils.get_global_tenant() + self.user = utils.get_user() + self.userdisabled = utils.get_userdisabled() + self.auth_token = utils.get_auth_token() + self.exp_auth_token = utils.get_exp_auth_token() + self.disabled_token = utils.get_disabled_token() self.global_group = 'test_global_group_add' def tearDown(self): - resp, content = delete_global_group(self.global_group, + resp, content = utils.delete_global_group(self.global_group, self.auth_token) - resp, content = delete_tenant(self.globaltenant, self.auth_token) + resp, content = utils.delete_tenant(self.globaltenant, self.auth_token) class create_global_group_test(global_group_test): def test_global_group_create(self): - respG, contentG = delete_global_group(self.global_group, + respG, contentG = utils.delete_global_group(self.global_group, str(self.auth_token)) - respG, contentG = create_global_group(self.global_group, + respG, contentG = utils.create_global_group(self.global_group, str(self.auth_token)) - + if int(respG['status']) == 500: self.fail('IDM fault') elif int(respG['status']) == 503: @@ -49,36 +50,34 @@ class create_global_group_test(global_group_test): self.fail('Failed due to %d' % int(respG['status'])) def test_global_group_create_xml(self): - respG, contentG = delete_global_group_xml(self.global_group, + respG, contentG = utils.delete_global_group_xml(self.global_group, str(self.auth_token)) - - respG, contentG = create_global_group_xml(self.global_group, + respG, contentG = utils.create_global_group_xml(self.global_group, str(self.auth_token)) - + if int(respG['status']) == 500: self.fail('IDM fault') elif int(respG['status']) == 503: self.fail('Service Not Available') - + if int(respG['status']) not in (200, 201): self.fail('Failed due to %d' % int(respG['status'])) def test_global_group_create_again(self): - respG, contentG = create_global_group(self.global_group, + respG, contentG = utils.create_global_group(self.global_group, str(self.auth_token)) - respG, contentG = create_global_group(self.global_group, + respG, contentG = utils.create_global_group(self.global_group, str(self.auth_token)) if int(respG['status']) == 500: self.fail('IDM fault') elif int(respG['status']) == 503: self.fail('Service Not Available') self.assertEqual(409, int(respG['status'])) - def test_global_group_create_again_xml(self): - respG, contentG = create_global_group_xml(self.global_group, + respG, contentG = utils.create_global_group_xml(self.global_group, str(self.auth_token)) - respG, contentG = create_global_group_xml(self.global_group, + respG, contentG = utils.create_global_group_xml(self.global_group, str(self.auth_token)) contentG = etree.fromstring(contentG) if int(respG['status']) == 500: @@ -86,13 +85,12 @@ class create_global_group_test(global_group_test): elif int(respG['status']) == 503: self.fail('Service Not Available') self.assertEqual(409, int(respG['status'])) - def test_global_group_create_unauthorized_token(self): h = httplib2.Http(".cache") - respG, contentG = create_global_group_xml(self.global_group, + respG, contentG = utils.create_global_group_xml(self.global_group, str(self.auth_token)) - url = '%sgroups' % (URL) + url = '%sgroups' % (utils.URL) body = {"group": {"id": self.global_group, "description": "A description ..."}} resp, content = h.request(url, "POST", body=json.dumps(body), @@ -106,7 +104,7 @@ class create_global_group_test(global_group_test): def test_global_group_create_unauthorized_token_xml(self): h = httplib2.Http(".cache") - url = '%sgroups' % (URL) + url = '%sgroups' % (utils.URL) body = '<?xml version="1.0" encoding="UTF-8"?> \ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ id="%s"> \ @@ -124,13 +122,13 @@ class create_global_group_test(global_group_test): def test_global_group_create_expired_token(self): h = httplib2.Http(".cache") - url = '%sgroups' % (URL) + url = '%sgroups' % (utils.URL) body = {"group": {"id": self.global_group, "description": "A description ..."}} resp, content = h.request(url, "POST", body=json.dumps(body), headers={"Content-Type": "application/json", - "X-Auth-Token": self.exp_auth_token - }) + "X-Auth-Token": \ + self.exp_auth_token}) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: @@ -139,7 +137,7 @@ class create_global_group_test(global_group_test): def test_global_group_create_expired_token_xml(self): h = httplib2.Http(".cache") - url = '%sgroups' % (URL) + url = '%sgroups' % (utils.URL) body = '<?xml version="1.0" encoding="UTF-8"?> \ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ id="%s"><description>A description...</description> \ @@ -156,7 +154,7 @@ class create_global_group_test(global_group_test): def test_global_group_create_missing_token(self): h = httplib2.Http(".cache") - url = '%sgroups' % (URL) + url = '%sgroups' % (utils.URL) body = {"group": {"id": self.global_group, "description": "A description ..."}} resp, content = h.request(url, "POST", body=json.dumps(body), @@ -169,7 +167,7 @@ class create_global_group_test(global_group_test): def test_global_group_create_missing_token_xml(self): h = httplib2.Http(".cache") - url = '%sgroups' % (URL) + url = '%sgroups' % (utils.URL) body = '<?xml version="1.0" encoding="UTF-8"?> \ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ id="%s"><description>A description...</description> \ @@ -185,13 +183,13 @@ class create_global_group_test(global_group_test): def test_global_group_create_disabled_token(self): h = httplib2.Http(".cache") - url = '%sgroups' % (URL) + url = '%sgroups' % (utils.URL) body = '{"group": { "id": "%s", \ "description": "A description ..." } }' % self.global_group resp, content = h.request(url, "POST", body=body, headers={"Content-Type": "application/json", - "X-Auth-Token": self.disabled_token - }) + "X-Auth-Token": \ + self.disabled_token}) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: @@ -200,7 +198,7 @@ class create_global_group_test(global_group_test): def test_global_group_create_disabled_token_xml(self): h = httplib2.Http(".cache") - url = '%sgroups' % (URL) + url = '%sgroups' % (utils.URL) body = '<?xml version="1.0" encoding="UTF-8"?> \ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ id="%s"><description>A description...</description> \ @@ -217,7 +215,7 @@ class create_global_group_test(global_group_test): def test_global_group_create_invalid_token(self): h = httplib2.Http(".cache") - url = '%sgroups' % (URL) + url = '%sgroups' % (utils.URL) body = '{"group": { "id": "%s", \ "description": "A description ..." } }' % self.globaltenant resp, content = h.request(url, "POST", body=body, @@ -231,7 +229,7 @@ class create_global_group_test(global_group_test): def test_global_group_create_invalid_token_xml(self): h = httplib2.Http(".cache") - url = '%sgroups' % (URL) + url = '%sgroups' % (utils.URL) body = '<?xml version="1.0" encoding="UTF-8"?> \ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ id="%s"><description>A description...</description> \ @@ -251,12 +249,12 @@ class get_global_groups_test(global_group_test): def test_get_global_groups(self): h = httplib2.Http(".cache") - respG, contentG = delete_global_group(self.global_group, + respG, contentG = utils.delete_global_group(self.global_group, str(self.auth_token)) - respG, contentG = create_global_group(self.global_group, + respG, contentG = utils.create_global_group(self.global_group, str(self.auth_token)) - - url = '%sgroups' % (URL) + + url = '%sgroups' % (utils.URL) resp, content = h.request(url, "GET", body='{}', headers={"Content-Type": "application/json", "X-Auth-Token": self.auth_token}) @@ -268,9 +266,9 @@ class get_global_groups_test(global_group_test): def test_get_global_groups_xml(self): h = httplib2.Http(".cache") - respG, contentG = create_global_group_xml(self.global_group, + respG, contentG = utils.create_global_group_xml(self.global_group, str(self.auth_token)) - url = '%sgroups' % (URL) + url = '%sgroups' % (utils.URL) resp, content = h.request(url, "GET", body='', headers={"Content-Type": "application/xml", "X-Auth-Token": self.auth_token, @@ -283,9 +281,9 @@ class get_global_groups_test(global_group_test): def test_get_global_groups_unauthorized_token(self): h = httplib2.Http(".cache") - respG, contentG = create_global_group(self.global_group, + respG, contentG = utils.create_global_group(self.global_group, str(self.auth_token)) - url = '%sgroups' % (URL) + url = '%sgroups' % (utils.URL) #test for Content-Type = application/json resp, content = h.request(url, "GET", body='{}', headers={"Content-Type": "application/json", @@ -298,9 +296,9 @@ class get_global_groups_test(global_group_test): def test_get_global_groups_unauthorized_token_xml(self): h = httplib2.Http(".cache") - respG, contentG = create_global_group_xml(self.global_group, + respG, contentG = utils.create_global_group_xml(self.global_group, str(self.auth_token)) - url = '%sgroups' % (URL) + url = '%sgroups' % (utils.URL) #test for Content-Type = application/json resp, content = h.request(url, "GET", body='', headers={"Content-Type": "application/xml", @@ -314,14 +312,14 @@ class get_global_groups_test(global_group_test): def test_get_global_groups_exp_token(self): h = httplib2.Http(".cache") - respG, contentG = create_global_group(self.global_group, + respG, contentG = utils.create_global_group(self.global_group, str(self.auth_token)) - url = '%sgroups' % (URL) + url = '%sgroups' % (utils.URL) #test for Content-Type = application/json resp, content = h.request(url, "GET", body='{}', headers={"Content-Type": "application/json", - "X-Auth-Token": self.exp_auth_token - }) + "X-Auth-Token": \ + self.exp_auth_token}) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: @@ -330,9 +328,9 @@ class get_global_groups_test(global_group_test): def test_get_global_groups_exp_token_xml(self): h = httplib2.Http(".cache") - respG, contentG = create_global_group_xml(self.global_group, + respG, contentG = utils.create_global_group_xml(self.global_group, str(self.auth_token)) - url = '%sgroups' % (URL) + url = '%sgroups' % (utils.URL) #test for Content-Type = application/json resp, content = h.request(url, "GET", body='', headers={"Content-Type": "application/xml", @@ -349,9 +347,9 @@ class get_global_group_test(global_group_test): def test_get_global_group(self): h = httplib2.Http(".cache") - respG, contentG = create_global_group(self.global_group, + respG, contentG = utils.create_global_group(self.global_group, str(self.auth_token)) - url = '%sgroups/%s' % (URL, self.global_group) + url = '%sgroups/%s' % (utils.URL, self.global_group) #test for Content-Type = application/json resp, content = h.request(url, "GET", body='{}', headers={"Content-Type": "application/json", @@ -364,9 +362,9 @@ class get_global_group_test(global_group_test): def test_get_global_group_xml(self): h = httplib2.Http(".cache") - respG, contentG = create_global_group_xml(self.global_group, + respG, contentG = utils.create_global_group_xml(self.global_group, str(self.auth_token)) - url = '%sgroups/%s' % (URL, self.global_group) + url = '%sgroups/%s' % (utils.URL, self.global_group) #test for Content-Type = application/json resp, content = h.request(url, "GET", body='', headers={"Content-Type": "application/xml", @@ -380,9 +378,9 @@ class get_global_group_test(global_group_test): def test_get_global_group_bad(self): h = httplib2.Http(".cache") - respG, contentG = create_global_group(self.global_group, + respG, contentG = utils.create_global_group(self.global_group, str(self.auth_token)) - url = '%sgroups/%s' % (URL, 'global_group_bad') + url = '%sgroups/%s' % (utils.URL, 'global_group_bad') #test for Content-Type = application/json resp, content = h.request(url, "GET", body='', headers={"Content-Type": "application/json", @@ -395,9 +393,9 @@ class get_global_group_test(global_group_test): def test_get_global_group_bad_xml(self): h = httplib2.Http(".cache") - respG, contentG = create_global_group_xml(self.global_group, + respG, contentG = utils.create_global_group_xml(self.global_group, str(self.auth_token)) - url = '%sgroups/%s' % (URL , 'global_group_bad') + url = '%sgroups/%s' % (utils.URL, 'global_group_bad') #test for Content-Type = application/json resp, content = h.request(url, "GET", body='', headers={"Content-Type": "application/xml", @@ -409,15 +407,14 @@ class get_global_group_test(global_group_test): self.fail('Service Not Available') self.assertEqual(404, int(resp['status'])) - class update_global_groups_test(global_group_test): def test_update_global_group(self): h = httplib2.Http(".cache") - respG, contentG = create_global_group(self.global_group, + respG, contentG = utils.create_global_group(self.global_group, str(self.auth_token)) - url = '%sgroups/%s' % (URL, self.global_group) + url = '%sgroups/%s' % (utils.URL, self.global_group) resp, content = h.request(url, "PUT", body='{"group":{\ "id" : "%s","description" :\ "A New description of the group..."}}' % self.global_group, @@ -435,10 +432,10 @@ class update_global_groups_test(global_group_test): def test_update_global_group_xml(self): h = httplib2.Http(".cache") - respG, contentG = create_global_group(self.global_group, + respG, contentG = utils.create_global_group(self.global_group, str(self.auth_token)) - - url = '%sgroups/%s' % (URL, self.global_group) + + url = '%sgroups/%s' % (utils.URL, self.global_group) data = u'<?xml version="1.0" encoding="UTF-8"?> \ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ id="%s"><description>A NEW description...</description> \ @@ -448,7 +445,7 @@ class update_global_groups_test(global_group_test): headers={"Content-Type": "application/xml", "X-Auth-Token": self.auth_token, "ACCEPT": "application/xml"}) - + body = etree.fromstring(content) desc = body.find("{http://docs.openstack.org/idm/api/v1.0}description") if int(resp['status']) == 500: @@ -461,9 +458,9 @@ class update_global_groups_test(global_group_test): def test_update_global_group_bad(self): h = httplib2.Http(".cache") - respG, contentG = create_global_group(self.global_group, + respG, contentG = utils.create_global_group(self.global_group, str(self.auth_token)) - url = '%sgroups/%s' % (URL, self.global_group) + url = '%sgroups/%s' % (utils.URL, self.global_group) data = '{"group": { "description_bad": "A NEW description...", \ "id":"%s" }}'\ % (self.global_group) @@ -479,9 +476,9 @@ class update_global_groups_test(global_group_test): def test_update_global_group_bad_xml(self): h = httplib2.Http(".cache") - respG, contentG = create_global_group_xml(self.global_group, + respG, contentG = utils.create_global_group_xml(self.global_group, str(self.auth_token)) - url = '%sgroups/%s' % (URL, self.global_group) + url = '%sgroups/%s' % (utils.URL, self.global_group) data = '<?xml version="1.0" encoding="UTF-8"?> \ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ id="%s"><description_bad>A NEW description...</description> \ @@ -500,9 +497,9 @@ class update_global_groups_test(global_group_test): def test_update_global_group_not_found(self): h = httplib2.Http(".cache") - respG, contentG = create_global_group(self.global_group, + respG, contentG = utils.create_global_group(self.global_group, str(self.auth_token)) - url = '%sgroups/NonexistingID' % (URL) + url = '%sgroups/NonexistingID' % (utils.URL) data = '{"group": { "description": "A NEW description...", \ "id":"NonexistingID"}}' #test for Content-Type = application/json @@ -513,9 +510,9 @@ class update_global_groups_test(global_group_test): def test_update_global_group_not_found_xml(self): h = httplib2.Http(".cache") - resp, content = create_tenant_xml(self.globaltenant, + resp, content = utils.create_tenant_xml(self.globaltenant, str(self.auth_token)) - url = '%sgroups/NonexistingID' % (URL) + url = '%sgroups/NonexistingID' % (utils.URL) data = '<?xml version="1.0" encoding="UTF-8"?> \ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ id="NonexistingID"> \ @@ -536,506 +533,467 @@ class update_global_groups_test(global_group_test): class delete_global_group_test(global_group_test): def test_delete_global_group_not_found(self): - resp, content = delete_global_group("test_global_group_1", + resp, content = utils.delete_global_group("test_global_group_1", str(self.auth_token)) self.assertEqual(404, int(resp['status'])) def test_delete_global_group_not_found_xml(self): - resp, content = delete_global_group_xml("test_global_group_1", + resp, content = utils.delete_global_group_xml("test_global_group_1", str(self.auth_token)) self.assertEqual(404, int(resp['status'])) def test_delete_global_group(self): - resp, content = create_tenant(self.globaltenant, str(self.auth_token)) - respG, contentG = create_tenant_group('test_global_group_delete', + resp, content = utils.create_tenant(self.globaltenant, + str(self.auth_token)) + respG, contentG = utils.create_tenant_group('test_global_group_delete', self.globaltenant, str(self.auth_token)) - respG, contentG = delete_global_group('test_global_group_delete', + respG, contentG = utils.delete_global_group('test_global_group_delete', str(self.auth_token)) - resp, content = delete_tenant(self.globaltenant, + resp, content = utils.delete_tenant(self.globaltenant, str(self.auth_token)) self.assertEqual(204, int(respG['status'])) def test_delete_global_group_xml(self): - resp, content = create_tenant_xml(self.globaltenant, - str(self.auth_token)) - respG, contentG = create_tenant_group_xml('test_global_group_delete', + resp, content = utils.create_tenant_xml(self.globaltenant, + str(self.auth_token)) + respG, contentG = utils.create_tenant_group_xml(\ + 'test_global_group_delete', self.globaltenant, str(self.auth_token)) - respG, contentG = delete_global_group_xml('test_global_group_delete', + respG, contentG = utils.delete_global_group_xml(\ + 'test_global_group_delete', str(self.auth_token)) - resp, content = delete_tenant_xml(self.globaltenant, + resp, content = utils.delete_tenant_xml(self.globaltenant, str(self.auth_token)) self.assertEqual(204, int(resp['status'])) - - class add_user_global_group_test(unittest.TestCase): - + def setUp(self): - self.token = get_token('joeuser', 'secrete', 'token') - self.tenant = get_global_tenant() - self.user = get_user() - self.userdisabled = get_userdisabled() - self.auth_token = get_auth_token() - self.exp_auth_token = get_exp_auth_token() - self.disabled_token = get_disabled_token() + self.token = utils.get_token('joeuser', 'secrete', 'token') + self.tenant = utils.get_global_tenant() + self.user = utils.get_user() + self.userdisabled = utils.get_userdisabled() + self.auth_token = utils.get_auth_token() + self.exp_auth_token = utils.get_exp_auth_token() + self.disabled_token = utils.get_disabled_token() self.global_group = 'test_global_group' - - def tearDown(self): - respG, contentG = delete_user_global_group(self.global_group, + respG, contentG = utils.delete_user_global_group(self.global_group, self.user, str(self.auth_token)) - - respG, contentG = delete_user(self.tenant, self.user, + + respG, contentG = utils.delete_user(self.tenant, self.user, str(self.auth_token)) - resp, content = delete_global_group(self.global_group, + resp, content = utils.delete_global_group(self.global_group, self.auth_token) - - - + def test_add_user_global_group(self): h = httplib2.Http(".cache") - resp, content = create_global_group(self.global_group, + resp, content = utils.create_global_group(self.global_group, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + respG, contentG = utils.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_global_group(self.global_group, - self.user, str(self.auth_token) - ) - + respG, contentG = utils.add_user_global_group(self.global_group, + self.user, + str(self.auth_token)) + if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') if int(respG['status']) not in (200, 201): self.fail('Failed due to %d' % int(respG['status'])) - - + def test_add_user_global_group_xml(self): h = httplib2.Http(".cache") - resp, content = create_global_group(self.global_group, + resp, content = utils.create_global_group(self.global_group, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + respG, contentG = utils.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_global_group_xml(self.global_group, - self.user, str(self.auth_token) - ) - - + respG, contentG = utils.add_user_global_group_xml(self.global_group, + self.user, + str(self.auth_token)) + if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') if int(respG['status']) not in (200, 201): self.fail('Failed due to %d' % int(respG['status'])) - - + def test_add_user_global_group_conflict(self): h = httplib2.Http(".cache") - resp, content = create_global_group(self.global_group, + resp, content = utils.create_global_group(self.global_group, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + respG, contentG = utils.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_global_group(self.global_group, - self.user, str(self.auth_token) - ) - respG, contentG = add_user_global_group(self.global_group, - self.user, str(self.auth_token) - ) - - + respG, contentG = utils.add_user_global_group(self.global_group, + self.user, + str(self.auth_token)) + respG, contentG = utils.add_user_global_group(self.global_group, + self.user, + str(self.auth_token)) + if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(409, int(respG['status'])) - + def test_add_user_global_group_conflict_xml(self): h = httplib2.Http(".cache") - resp, content = create_global_group(self.global_group, + resp, content = utils.create_global_group(self.global_group, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + respG, contentG = utils.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_global_group_xml(self.global_group, - self.user, str(self.auth_token) - ) - respG, contentG = add_user_global_group_xml(self.global_group, - self.user, str(self.auth_token) - ) - + respG, contentG = utils.add_user_global_group_xml(self.global_group, + self.user, + str(self.auth_token)) + respG, contentG = utils.add_user_global_group_xml(self.global_group, + self.user, + str(self.auth_token)) + if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(409, int(respG['status'])) - + def test_add_user_global_group_unauthorized(self): h = httplib2.Http(".cache") - resp, content = create_global_group(self.global_group, + resp, content = utils.create_global_group(self.global_group, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + respG, contentG = utils.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_global_group(self.global_group, - self.user, str(self.token) - ) - - + respG, contentG = utils.add_user_global_group(self.global_group, + self.user, + str(self.token)) + if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(401, int(respG['status'])) - + def test_add_user_global_group_unauthorized_xml(self): h = httplib2.Http(".cache") - resp, content = create_global_group(self.global_group, + resp, content = utils.create_global_group(self.global_group, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + respG, contentG = utils.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_global_group_xml(self.global_group, - self.user, str(self.token) - ) - + respG, contentG = utils.add_user_global_group_xml(self.global_group, + self.user, + str(self.token)) + if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(401, int(respG['status'])) - + def test_add_user_global_group_forbidden(self): h = httplib2.Http(".cache") - resp, content = create_global_group(self.global_group, + resp, content = utils.create_global_group(self.global_group, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + respG, contentG = utils.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_global_group(self.global_group, - self.user, - str(self.disabled_token) - ) - + respG, contentG = utils.add_user_global_group(self.global_group, + self.user, + str(self.disabled_token)) + if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(403, int(respG['status'])) - + def test_add_user_global_group_forbidden_xml(self): h = httplib2.Http(".cache") - resp, content = create_global_group(self.global_group, + resp, content = utils.create_global_group(self.global_group, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + respG, contentG = utils.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_global_group_xml(self.global_group, - self.user, - str(self.disabled_token) - ) + respG, contentG = utils.add_user_global_group_xml(self.global_group, + self.user, + str(self.disabled_token)) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(403, int(respG['status'])) - + class get_users_tenant_group_test(unittest.TestCase): - + def setUp(self): - self.token = get_token('joeuser', 'secrete', 'token') - self.tenant = get_global_tenant() - self.user = get_user() - self.userdisabled = get_userdisabled() - self.auth_token = get_auth_token() - self.exp_auth_token = get_exp_auth_token() - self.disabled_token = get_disabled_token() + self.token = utils.get_token('joeuser', 'secrete', 'token') + self.tenant = utils.get_global_tenant() + self.user = utils.get_user() + self.userdisabled = utils.get_userdisabled() + self.auth_token = utils.get_auth_token() + self.exp_auth_token = utils.get_exp_auth_token() + self.disabled_token = utils.get_disabled_token() self.global_group = 'test_global_group' - - def tearDown(self): - respG, contentG = delete_user_global_group(self.global_group, + respG, contentG = utils.delete_user_global_group(self.global_group, self.user, str(self.auth_token)) - - respG, contentG = delete_user(self.tenant, self.user, + + respG, contentG = utils.delete_user(self.tenant, self.user, str(self.auth_token)) - resp, content = delete_global_group(self.global_group, - self.auth_token) + resp, content = utils.delete_global_group(self.global_group, + self.auth_token) + def test_get_users_global_group(self): h = httplib2.Http(".cache") - resp, content = create_global_group(self.global_group, + resp, content = utils.create_global_group(self.global_group, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + respG, contentG = utils.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_global_group(self.global_group, - self.user, - str(self.auth_token) - ) - respG, contentG = get_user_global_group(self.global_group, - str(self.auth_token) - ) - + respG, contentG = utils.add_user_global_group(self.global_group, + self.user, + str(self.auth_token)) + respG, contentG = utils.get_user_global_group(self.global_group, + str(self.auth_token)) + if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(200, int(respG['status'])) - - + def test_get_users_global_group_xml(self): h = httplib2.Http(".cache") - resp, content = create_global_group(self.global_group, + resp, content = utils.create_global_group(self.global_group, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + respG, contentG = utils.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_global_group_xml(self.global_group, - self.user, - str(self.auth_token) - ) - respG, contentG = get_user_global_group_xml(self.global_group, - str(self.auth_token) - ) + respG, contentG = utils.add_user_global_group_xml(self.global_group, + self.user, + str(self.auth_token)) + respG, contentG = utils.get_user_global_group_xml(self.global_group, + str(self.auth_token)) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(200, int(respG['status'])) - - + def test_get_users_global_group_unauthorized(self): h = httplib2.Http(".cache") - resp, content = create_global_group(self.global_group, + resp, content = utils.create_global_group(self.global_group, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + respG, contentG = utils.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_global_group(self.global_group, - self.user, - str(self.auth_token) - ) - - respG, contentG = get_user_global_group(self.global_group, - str(self.token) - ) - + respG, contentG = utils.add_user_global_group(self.global_group, + self.user, + str(self.auth_token)) + + respG, contentG = utils.get_user_global_group(self.global_group, + str(self.token)) + if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(401, int(respG['status'])) - + def test_get_users_global_group_unauthorized_xml(self): h = httplib2.Http(".cache") - resp, content = create_global_group(self.global_group, + resp, content = utils.create_global_group(self.global_group, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + respG, contentG = utils.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_global_group(self.global_group, - self.user, - str(self.auth_token) - ) - respG, contentG = get_user_global_group_xml(self.global_group, - str(self.token) - ) - + respG, contentG = utils.add_user_global_group(self.global_group, + self.user, + str(self.auth_token)) + respG, contentG = utils.get_user_global_group_xml(self.global_group, + str(self.token)) + if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(401, int(respG['status'])) - + def test_get_users_global_group_forbidden(self): h = httplib2.Http(".cache") - resp, content = create_global_group(self.global_group, + resp, content = utils.create_global_group(self.global_group, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + respG, contentG = utils.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_global_group(self.global_group, - self.user, - str(self.auth_token) - ) - respG, contentG = get_user_global_group(self.global_group, - str(self.disabled_token) - ) - + respG, contentG = utils.add_user_global_group(self.global_group, + self.user, + str(self.auth_token)) + respG, contentG = utils.get_user_global_group(self.global_group, + str(self.disabled_token)) + if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(403, int(respG['status'])) - + def test_get_users_global_group_forbidden_xml(self): h = httplib2.Http(".cache") - resp, content = create_global_group(self.global_group, + resp, content = utils.create_global_group(self.global_group, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + respG, contentG = utils.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_global_group(self.global_group, - self.user, - str(self.auth_token) - ) - respG, contentG = get_user_global_group_xml(self.global_group, - str(self.disabled_token) - ) + respG, contentG = utils.add_user_global_group(self.global_group, + self.user, + str(self.auth_token)) + respG, contentG = utils.get_user_global_group_xml(self.global_group, + str(self.disabled_token)) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(403, int(respG['status'])) - + def test_get_users_global_group_expired(self): h = httplib2.Http(".cache") - resp, content = create_global_group(self.global_group, + resp, content = utils.create_global_group(self.global_group, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + respG, contentG = utils.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_global_group(self.global_group, - self.user, - str(self.auth_token) - ) - respG, contentG = get_user_global_group(self.global_group, - str(self.exp_auth_token) - ) - + respG, contentG = utils.add_user_global_group(self.global_group, + self.user, + str(self.auth_token)) + respG, contentG = utils.get_user_global_group(self.global_group, + str(self.exp_auth_token)) + if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(403, int(respG['status'])) - + def test_get_users_global_group_expired_xml(self): h = httplib2.Http(".cache") - resp, content = create_global_group(self.global_group, + resp, content = utils.create_global_group(self.global_group, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + respG, contentG = utils.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_global_group(self.global_group, - self.user, - str(self.auth_token) - ) - respG, contentG = get_user_global_group_xml(self.global_group, - str(self.exp_auth_token) - ) + respG, contentG = utils.add_user_global_group(self.global_group, + self.user, + str(self.auth_token)) + respG, contentG = utils.get_user_global_group_xml(self.global_group, + str(self.exp_auth_token)) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(403, int(respG['status'])) - -class delete_users_global_group_test(unittest.TestCase): - + + +class delete_users_global_group_test(unittest.TestCase): + def setUp(self): - self.token = get_token('joeuser', 'secrete', 'token') - self.tenant = get_global_tenant() - self.user = get_user() - self.userdisabled = get_userdisabled() - self.auth_token = get_auth_token() - self.exp_auth_token = get_exp_auth_token() - self.disabled_token = get_disabled_token() + self.token = utils.get_token('joeuser', 'secrete', 'token') + self.tenant = utils.get_global_tenant() + self.user = utils.get_user() + self.userdisabled = utils.get_userdisabled() + self.auth_token = utils.get_auth_token() + self.exp_auth_token = utils.get_exp_auth_token() + self.disabled_token = utils.get_disabled_token() self.global_group = 'test_global_group' - - def tearDown(self): - respG, contentG = delete_user_global_group(self.global_group, + respG, contentG = utils.delete_user_global_group(self.global_group, self.user, str(self.auth_token)) - - respG, contentG = delete_user(self.tenant, self.user, + + respG, contentG = utils.delete_user(self.tenant, self.user, str(self.auth_token)) - resp, content = delete_global_group(self.global_group, + resp, content = utils.delete_global_group(self.global_group, self.auth_token) + def test_delete_user_global_group(self): h = httplib2.Http(".cache") - resp, content = create_global_group(self.global_group, + resp, content = utils.create_global_group(self.global_group, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + respG, contentG = utils.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_global_group(self.global_group, - self.user, - str(self.auth_token) - ) - - respG, contentG = delete_user_global_group(self.global_group, - self.user, - str(self.auth_token) - ) - + respG, contentG = utils.add_user_global_group(self.global_group, + self.user, + str(self.auth_token)) + + respG, contentG = utils.delete_user_global_group(self.global_group, + self.user, + str(self.auth_token)) + if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(204, int(respG['status'])) - - + def test_delete_user_global_group_xml(self): h = httplib2.Http(".cache") - resp, content = create_global_group(self.global_group, + resp, content = utils.create_global_group(self.global_group, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + respG, contentG = utils.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_global_group(self.global_group, - self.user, - str(self.auth_token) - ) - respG, contentG = delete_user_global_group_xml(self.global_group, - self.user, - str(self.auth_token) - ) + respG, contentG = utils.add_user_global_group(self.global_group, + self.user, + str(self.auth_token)) + respG, contentG = utils.delete_user_global_group_xml(self.global_group, + self.user, + str(self.auth_token)) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(204, int(respG['status'])) - + def test_delete_user_global_group_notfound(self): h = httplib2.Http(".cache") - resp, content = create_global_group(self.global_group, + resp, content = utils.create_global_group(self.global_group, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + respG, contentG = utils.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_global_group(self.global_group, - self.user, - str(self.disabled_token) - ) - respG, contentG = delete_user_global_group(self.global_group, - self.user, - str(self.auth_token) - ) - respG, contentG = delete_user_global_group(self.global_group, - self.user, - str(self.auth_token) - ) + respG, contentG = utils.add_user_global_group(self.global_group, + self.user, + str(self.disabled_token)) + respG, contentG = utils.delete_user_global_group(self.global_group, + self.user, + str(self.auth_token)) + respG, contentG = utils.delete_user_global_group(self.global_group, + self.user, + str(self.auth_token)) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(404, int(respG['status'])) - + def test_delete_user_global_group_notfound_xml(self): h = httplib2.Http(".cache") - resp, content = create_global_group(self.global_group, + resp, content = utils.create_global_group(self.global_group, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + respG, contentG = utils.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_global_group(self.global_group, - self.user, - str(self.disabled_token) - ) - respG, contentG = delete_user_global_group(self.global_group, - self.user, - str(self.auth_token) - ) - respG, contentG = delete_user_global_group_xml(self.global_group, - self.user, - str(self.auth_token) - ) - + respG, contentG = utils.add_user_global_group(self.global_group, + self.user, + str(self.disabled_token)) + respG, contentG = utils.delete_user_global_group(self.global_group, + self.user, + str(self.auth_token)) + respG, contentG = utils.delete_user_global_group_xml(self.global_group, + self.user, + str(self.auth_token)) + if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(404, int(respG['status'])) + if __name__ == '__main__': - unittest.main()
\ No newline at end of file + unittest.main() diff --git a/test/unit/test_keystone.py b/test/unit/test_keystone.py index 31b6ae32..9a6fe4df 100644 --- a/test/unit/test_keystone.py +++ b/test/unit/test_keystone.py @@ -1,9 +1,11 @@ -#TODO (India Team) Need to modify this script import logging import os import unittest - -MODULE_EXTENSIONS = set('.py .pyc .pyo'.split()) +from lxml import etree +MODULE_EXTENSIONS = set('.py'.split()) +TEST_FILES = ['test_authentication.py', 'test_keystone.py', 'test_tenants.py', + 'test_common.py', 'test_users.py','test_tenant_groups.py', + 'test_token.py', 'test_version.py', 'test_groups.py'] def unit_test_extractor(tup, path, filenames): """Pull ``unittest.TestSuite``s from modules in path @@ -16,17 +18,19 @@ def unit_test_extractor(tup, path, filenames): relpath = os.path.relpath(path, package_path) relpath_pieces = relpath.split(os.sep) - if relpath_pieces[0] == '.': # Base directory. - relpath_pieces.pop(0) # Otherwise, screws up module name. + if relpath_pieces[0] == '.': # Base directory. + relpath_pieces.pop(0) # Otherwise, screws up module name. elif not any(os.path.exists(os.path.join(path, '__init__' + ext)) for ext in MODULE_EXTENSIONS): - return # Not a package directory and not the base directory, reject. + return # Not a package directory and not the base directory, reject. logging.info('Base: %s', '.'.join(relpath_pieces)) for filename in filenames: - base, ext = os.path.splitext(filename) - if ext not in MODULE_EXTENSIONS: # Not a Python module. + if filename not in TEST_FILES: continue + base, ext = os.path.splitext(filename) + #if ext not in MODULE_EXTENSIONS : # Not a Python module. + # continue logging.info('Module: %s', base) module_name = '.'.join(relpath_pieces + [base]) logging.info('Importing from %s', module_name) @@ -35,6 +39,7 @@ def unit_test_extractor(tup, path, filenames): logging.info('Got suites: %s', module_suites) suites += module_suites + def get_test_suites(path): """:return: Iterable of suites for the packages/modules present under :param:`path`. @@ -50,5 +55,4 @@ if __name__ == '__main__': package_path = os.path.dirname(os.path.abspath(__file__)) suites = get_test_suites(package_path) for suite in suites: - unittest.TextTestRunner(verbosity=2).run(suite) - + unittest.TextTestRunner(verbosity=1).run(suite) diff --git a/test/unit/test_tenant_groups.py b/test/unit/test_tenant_groups.py index f15bd909..9c46db42 100644 --- a/test/unit/test_tenant_groups.py +++ b/test/unit/test_tenant_groups.py @@ -4,130 +4,120 @@ import sys sys.path.append(os.path.abspath(os.path.join(os.path.abspath(__file__), '..', '..', '..', '..', 'keystone'))) import unittest -from webtest import TestApp import httplib2 import json -from lxml import etree -import unittest -from webtest import TestApp -from test_common import * +from lxml import etree +import test_common as util class tenant_group_test(unittest.TestCase): def setUp(self): - self.token = get_token('joeuser', 'secrete', 'token') - self.tenant = get_tenant() - self.user = get_user() - self.userdisabled = get_userdisabled() - self.auth_token = get_auth_token() - self.exp_auth_token = get_exp_auth_token() - self.disabled_token = get_disabled_token() + self.token = util.get_token('joeuser', 'secrete', 'token') + self.tenant = util.get_tenant() + self.user = util.get_user() + self.userdisabled = util.get_userdisabled() + self.auth_token = util.get_auth_token() + self.exp_auth_token = util.get_exp_auth_token() + self.disabled_token = util.get_disabled_token() self.tenant_group = 'test_tenant_group_add' def tearDown(self): - resp, content = delete_tenant_group(self.tenant_group, + resp, content = util.delete_tenant_group(self.tenant_group, self.tenant, self.auth_token) - resp, content = delete_tenant(self.tenant, self.auth_token) + resp, content = util.delete_tenant(self.tenant, self.auth_token) class create_tenant_group_test(tenant_group_test): def test_tenant_group_create(self): - resp, content = delete_tenant(self.tenant, str(self.auth_token)) - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = delete_tenant_group(self.tenant_group, + resp, content = util.delete_tenant(self.tenant, str(self.auth_token)) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) + resp, content = util.delete_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - if int(respG['status']) not in (200, 201): - self.fail('Failed due to %d' % int(respG['status'])) + if int(resp['status']) not in (200, 201): + self.fail('Failed due to %d' % int(resp['status'])) def test_tenant_group_create_xml(self): - resp, content = delete_tenant_xml(self.tenant, str(self.auth_token)) - resp, content = create_tenant_xml(self.tenant, str(self.auth_token)) - respG, contentG = delete_tenant_group_xml(self.tenant_group, + resp, content = util.delete_tenant_xml(self.tenant, + str(self.auth_token)) + resp, content = util.create_tenant_xml(self.tenant, + str(self.auth_token)) + resp, content = util.delete_tenant_group_xml(self.tenant_group, self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group_xml(self.tenant_group, + resp, content = util.create_tenant_group_xml(self.tenant_group, self.tenant, str(self.auth_token)) - self.tenant = self.tenant - self.tenant_group = self.tenant_group + content = etree.fromstring(content) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') - if int(respG['status']) not in (200, 201): - self.fail('Failed due to %d' % int(respG['status'])) + if int(resp['status']) not in (200, 201): + self.fail('Failed due to %d' % int(resp['status'])) def test_tenant_group_create_again(self): - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + resp, content = util.create_tenant(self.tenant, + str(self.auth_token)) + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - if int(respG['status']) == 200: + if int(resp['status']) == 200: self.tenant = content['tenant']['id'] - self.tenant_group = contentG['group']['id'] - if int(respG['status']) == 500: + self.tenant_group = content['group']['id'] + if int(resp['status']) == 500: self.fail('IDM fault') - elif int(respG['status']) == 503: + elif int(resp['status']) == 503: self.fail('Service Not Available') - self.assertEqual(409, int(respG['status'])) - if int(respG['status']) == 200: - self.tenant = content['tenant']['id'] - self.tenant_group = contentG['group']['id'] + self.assertEqual(409, int(resp['status'])) def test_tenant_group_create_again_xml(self): - resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) - respG, contentG = create_tenant_group_xml(self.tenant_group, + resp, content = util.create_tenant_xml("test_tenant", + str(self.auth_token)) + resp, content = util.create_tenant_group_xml(self.tenant_group, self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group_xml(self.tenant_group, + resp_new, content_new = util.create_tenant_group_xml(self.tenant_group, self.tenant, str(self.auth_token)) - content = etree.fromstring(content) - contentG = etree.fromstring(contentG) - if int(respG['status']) == 200: - self.tenant = content.get("id") - self.tenant_group = contentG.get("id") - if int(respG['status']) == 500: + if int(resp['status']) == 500: self.fail('IDM fault') - elif int(respG['status']) == 503: + elif int(resp['status']) == 503: self.fail('Service Not Available') - self.assertEqual(409, int(respG['status'])) - if int(respG['status']) == 200: - self.tenant = content.get("id") - self.tenant_group = contentG.get("id") + self.assertEqual(409, int(resp['status'])) def test_tenant_group_create_unauthorized_token(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group_xml(self.tenant_group, + header = httplib2.Http(".cache") + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + resp, content = util.create_tenant_group_xml(self.tenant_group, self.tenant, str(self.auth_token)) + if int(resp['status']) == 200: - self.tenant = content['tenant']['id'] - if int(respG['status']) == 200: - self.tenant_group = respG['group']['id'] - url = '%stenant/%s/groups' % (URL, self.tenant) + self.tenant_group = resp['group']['id'] + url = '%stenant/%s/groups' % (util.URL, self.tenant) body = {"group": {"id": self.tenant_group, "description": "A description ..."}} - resp, content = h.request(url, "POST", body=json.dumps(body), + resp, content = header.request(url, "POST", body=json.dumps(body), headers={"Content-Type": "application/json", "X-Auth-Token": self.token}) if int(resp['status']) == 500: @@ -137,16 +127,16 @@ class create_tenant_group_test(tenant_group_test): self.assertEqual(401, int(resp['status'])) def test_tenant_group_create_unauthorized_token_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) + header = httplib2.Http(".cache") + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) if int(resp['status']) == 200: self.tenant = content['tenant']['id'] - url = '%stenant/%s/groups' % (URL, self.tenant) + url = '%stenant/%s/groups' % (util.URL, self.tenant) body = '<?xml version="1.0" encoding="UTF-8"?> \ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ id="%s"><description>A description...</description> \ </group>' % self.tenant_group - resp, content = h.request(url, "POST", body=body, + resp, content = header.request(url, "POST", body=body, headers={"Content-Type": "application/xml", "X-Auth-Token": self.token, "ACCEPT": "application/xml"}) @@ -157,14 +147,14 @@ class create_tenant_group_test(tenant_group_test): self.assertEqual(401, int(resp['status'])) def test_tenant_group_create_expired_token(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) + header = httplib2.Http(".cache") + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) if int(resp['status']) == 200: self.tenant = content['tenant']['id'] - url = '%stenant/%s/groups' % (URL, self.tenant) + url = '%stenant/%s/groups' % (util.URL, self.tenant) body = {"group": {"id": self.tenant_group, "description": "A description ..."}} - resp, content = h.request(url, "POST", body=json.dumps(body), + resp, content = header.request(url, "POST", body=json.dumps(body), headers={"Content-Type": "application/json", "X-Auth-Token": self.exp_auth_token}) if int(resp['status']) == 500: @@ -174,18 +164,19 @@ class create_tenant_group_test(tenant_group_test): self.assertEqual(403, int(resp['status'])) def test_tenant_group_create_expired_token_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant_xml(self.tenant, str(self.auth_token)) + header = httplib2.Http(".cache") + resp, content = util.create_tenant_xml(self.tenant, + str(self.auth_token)) content = etree.fromstring(content) if int(resp['status']) == 200: self.tenant = content.get('id') - url = '%stenant/%s/groups' % (URL, self.tenant) + url = '%stenant/%s/groups' % (util.URL, self.tenant) body = '<?xml version="1.0" encoding="UTF-8"?> \ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ id="%s"> \ <description>A description...</description> \ </group>' % self.tenant - resp, content = h.request(url, "POST", body=body, + resp, content = header.request(url, "POST", body=body, headers={"Content-Type": "application/xml", "X-Auth-Token": self.exp_auth_token, "ACCEPT": "application/xml"}) @@ -196,14 +187,14 @@ class create_tenant_group_test(tenant_group_test): self.assertEqual(403, int(resp['status'])) def test_tenant_group_create_missing_token(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) + header = httplib2.Http(".cache") + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) if int(resp['status']) == 200: self.tenant = content['tenant']['id'] - url = '%stenant/%s/groups' % (URL, self.tenant) + url = '%stenant/%s/groups' % (util.URL, self.tenant) body = {"group": {"id": self.tenant_group, "description": "A description ..."}} - resp, content = h.request(url, "POST", body=json.dumps(body), + resp, content = header.request(url, "POST", body=json.dumps(body), headers={"Content-Type": "application/json"}) if int(resp['status']) == 500: self.fail('IDM fault') @@ -211,20 +202,20 @@ class create_tenant_group_test(tenant_group_test): self.fail('Service Not Available') self.assertEqual(401, int(resp['status'])) - def test_tenant_group_create_missing_token_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant_xml(self.tenant, str(self.auth_token)) + header = httplib2.Http(".cache") + resp, content = util.create_tenant_xml(self.tenant, + str(self.auth_token)) content = etree.fromstring(content) if int(resp['status']) == 200: self.tenant = content.get('id') - url = '%stenant/%s/groups' % (URL, self.tenant) + url = '%stenant/%s/groups' % (util.URL, self.tenant) body = '<?xml version="1.0" encoding="UTF-8"?> \ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ id="%s"> \ <description>A description...</description> \ </group>' % self.tenant_group - resp, content = h.request(url, "POST", body=body, + resp, content = header.request(url, "POST", body=body, headers={"Content-Type": "application/xml", "ACCEPT": "application/xml"}) if int(resp['status']) == 500: @@ -233,19 +224,18 @@ class create_tenant_group_test(tenant_group_test): self.fail('Service Not Available') self.assertEqual(401, int(resp['status'])) - def test_tenant_group_create_disabled_token(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) + header = httplib2.Http(".cache") + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) if int(resp['status']) == 200: self.tenant = content['tenant']['id'] - url = '%stenant/%s/groups' % (URL, self.tenant) + url = '%stenant/%s/groups' % (util.URL, self.tenant) body = '{"group": { "id": "%s", \ "description": "A description ..." } }' % self.tenant_group - resp, content = h.request(url, "POST", body=body, + resp, content = header.request(url, "POST", body=body, headers={"Content-Type": "application/json", - "X-Auth-Token": self.disabled_token}) + "X-Auth-Token": self.disabled_token}) if int(resp['status']) == 500: self.fail('IDM fault') @@ -254,19 +244,20 @@ class create_tenant_group_test(tenant_group_test): self.assertEqual(403, int(resp['status'])) def test_tenant_group_create_disabled_token_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant_xml(self.tenant, str(self.auth_token)) + header = httplib2.Http(".cache") + resp, content = util.create_tenant_xml(self.tenant, + str(self.auth_token)) content = etree.fromstring(content) if int(resp['status']) == 200: self.tenant = content.get('id') - url = '%stenant/%s/groups' % (URL, self.tenant) + url = '%stenant/%s/groups' % (util.URL, self.tenant) body = '<?xml version="1.0" encoding="UTF-8"?> \ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ id="%s"> \ <description>A description...</description> \ </group>' % self.tenant_group - resp, content = h.request(url, "POST", body=body, + resp, content = header.request(url, "POST", body=body, headers={"Content-Type": "application/xml", "X-Auth-Token": self.disabled_token, "ACCEPT": "application/xml"}) @@ -277,15 +268,15 @@ class create_tenant_group_test(tenant_group_test): self.assertEqual(403, int(resp['status'])) def test_tenant_group_create_invalid_token(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) + header = httplib2.Http(".cache") + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) if int(resp['status']) == 200: self.tenant = content['tenant']['id'] - url = '%stenant/%s/groups' % (URL, self.tenant) + url = '%stenant/%s/groups' % (util.URL, self.tenant) body = '{"group": { "id": "%s", \ "description": "A description ..." } }' % self.tenant - resp, content = h.request(url, "POST", body=body, + resp, content = header.request(url, "POST", body=body, headers={"Content-Type": "application/json", "X-Auth-Token": 'nonexsitingtoken'}) @@ -296,19 +287,20 @@ class create_tenant_group_test(tenant_group_test): self.assertEqual(404, int(resp['status'])) def test_tenant_group_create_invalid_token_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant_xml(self.tenant, str(self.auth_token)) + header = httplib2.Http(".cache") + resp, content = util.create_tenant_xml(self.tenant, + str(self.auth_token)) content = etree.fromstring(content) if int(resp['status']) == 200: self.tenant = content.get('id') - url = '%stenant/%s/groups' % (URL, self.tenant) + url = '%stenant/%s/groups' % (util.URL, self.tenant) body = '<?xml version="1.0" encoding="UTF-8"?> \ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ id="%s"> \ <description>A description...</description> \ </group>' % self.tenant_group - resp, content = h.request(url, "POST", body=body, + resp, content = header.request(url, "POST", body=body, headers={"Content-Type": "application/xml", "X-Auth-Token": 'nonexsitingtoken', "ACCEPT": "application/xml"}) @@ -323,16 +315,16 @@ class create_tenant_group_test(tenant_group_test): class get_tenant_groups_test(tenant_group_test): def test_get_tenant_groups(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) + header = httplib2.Http(".cache") + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups' % (URL, self.tenant) + url = '%stenant/%s/groups' % (util.URL, self.tenant) - resp, content = h.request(url, "GET", body='{}', + resp, content = header.request(url, "GET", body='{}', headers={"Content-Type": "application/json", "X-Auth-Token": self.auth_token}) if int(resp['status']) == 500: @@ -342,14 +334,14 @@ class get_tenant_groups_test(tenant_group_test): self.assertEqual(200, int(resp['status'])) def test_get_tenant_groups_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) + header = httplib2.Http(".cache") + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group_xml(self.tenant_group, + resp, content = util.create_tenant_group_xml(self.tenant_group, self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups' % (URL, self.tenant) - resp, content = h.request(url, "GET", body='', + url = '%stenant/%s/groups' % (util.URL, self.tenant) + resp, content = header.request(url, "GET", body='', headers={"Content-Type": "application/xml", "X-Auth-Token": self.auth_token, "ACCEPT": "application/xml"}) @@ -360,15 +352,15 @@ class get_tenant_groups_test(tenant_group_test): self.assertEqual(200, int(resp['status'])) def test_get_tenant_groups_unauthorized_token(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) + header = httplib2.Http(".cache") + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups' % (URL, self.tenant) + url = '%stenant/%s/groups' % (util.URL, self.tenant) #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='{}', + resp, content = header.request(url, "GET", body='{}', headers={"Content-Type": "application/json", "X-Auth-Token": self.token}) if int(resp['status']) == 500: @@ -378,14 +370,14 @@ class get_tenant_groups_test(tenant_group_test): self.assertEqual(401, int(resp['status'])) def test_get_tenant_groups_unauthorized_token_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + header = httplib2.Http(".cache") + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups' % (URL, self.tenant) + url = '%stenant/%s/groups' % (util.URL, self.tenant) #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='', + resp, content = header.request(url, "GET", body='', headers={"Content-Type": "application/xml", "X-Auth-Token": self.token, "ACCEPT": "application/xml"}) @@ -396,16 +388,16 @@ class get_tenant_groups_test(tenant_group_test): self.assertEqual(401, int(resp['status'])) def test_get_tenant_groups_exp_token(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + header = httplib2.Http(".cache") + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups' % (URL, self.tenant) + url = '%stenant/%s/groups' % (util.URL, self.tenant) #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='{}', + resp, content = header.request(url, "GET", body='{}', headers={"Content-Type": "application/json", - "X-Auth-Token": self.exp_auth_token}) + "X-Auth-Token": self.exp_auth_token}) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: @@ -413,14 +405,14 @@ class get_tenant_groups_test(tenant_group_test): self.assertEqual(403, int(resp['status'])) def test_get_tenant_groups_exp_token_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + header = httplib2.Http(".cache") + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups' % (URL, self.tenant) + url = '%stenant/%s/groups' % (util.URL, self.tenant) #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='', + resp, content = header.request(url, "GET", body='', headers={"Content-Type": "application/xml", "X-Auth-Token": self.exp_auth_token, "ACCEPT": "application/xml"}) @@ -434,14 +426,15 @@ class get_tenant_groups_test(tenant_group_test): class get_tenant_group_test(tenant_group_test): def test_get_tenant_group(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + header = httplib2.Http(".cache") + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups/%s' % (URL, self.tenant, self.tenant_group) + url = '%stenant/%s/groups/%s' % (util.URL, self.tenant, + self.tenant_group) #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='{}', + resp, content = header.request(url, "GET", body='{}', headers={"Content-Type": "application/json", "X-Auth-Token": self.auth_token}) if int(resp['status']) == 500: @@ -451,14 +444,15 @@ class get_tenant_group_test(tenant_group_test): self.assertEqual(200, int(resp['status'])) def test_get_tenant_group_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + header = httplib2.Http(".cache") + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups/%s' % (URL, self.tenant, self.tenant_group) + url = '%stenant/%s/groups/%s' % (util.URL, self.tenant, + self.tenant_group) #test for Content-Type = application/xml - resp, content = h.request(url, "GET", body='', + resp, content = header.request(url, "GET", body='', headers={"Content-Type": "application/xml", "X-Auth-Token": self.auth_token, "ACCEPT": "application/xml"}) @@ -469,15 +463,16 @@ class get_tenant_group_test(tenant_group_test): self.assertEqual(200, int(resp['status'])) def test_get_tenant_group_bad(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + header = httplib2.Http(".cache") + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups/%s' % (URL, 'tenant_bad', self.tenant_group) + url = '%stenant/%s/groups/%s' % (util.URL, 'tenant_bad', + self.tenant_group) #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='{', + resp, content = header.request(url, "GET", body='{', headers={"Content-Type": "application/json", "X-Auth-Token": self.auth_token}) if int(resp['status']) == 500: @@ -487,14 +482,15 @@ class get_tenant_group_test(tenant_group_test): self.assertEqual(404, int(resp['status'])) def test_get_tenant_group_bad_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + header = httplib2.Http(".cache") + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups/%s' % (URL, 'tenant_bad', self.tenant_group) + url = '%stenant/%s/groups/%s' % (util.URL, 'tenant_bad', + self.tenant_group) #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='{', + resp, content = header.request(url, "GET", body='{', headers={"Content-Type": "application/xml", "X-Auth-Token": self.auth_token, "ACCEPT": "application/xml"}) @@ -505,32 +501,34 @@ class get_tenant_group_test(tenant_group_test): self.assertEqual(404, int(resp['status'])) def test_get_tenant_group_not_found(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + header = httplib2.Http(".cache") + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups/%s' % (URL, self.tenant, 'nonexistinggroup') + url = '%stenant/%s/groups/%s' % (util.URL, self.tenant, + 'nonexistinggroup') #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='{}', + resp, content = header.request(url, "GET", body='{}', headers={"Content-Type": "application/json", "X-Auth-Token": self.auth_token}) if int(resp['status']) == 500: - self.fail('IDM fault') + self.fail('IDM fault') elif int(resp['status']) == 503: - self.fail('Service Not Available') + self.fail('Service Not Available') self.assertEqual(404, int(resp['status'])) def test_get_tenant_group_not_found_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + header = httplib2.Http(".cache") + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups/%s' % (URL, self.tenant, 'nonexistinggroup') + url = '%stenant/%s/groups/%s' % (util.URL, self.tenant, + 'nonexistinggroup') #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='', + resp, content = header.request(url, "GET", body='', headers={"Content-Type": "application/xml", "X-Auth-Token": self.auth_token, "ACCEPT": "application/xml"}) @@ -544,25 +542,25 @@ class get_tenant_group_test(tenant_group_test): class update_tenant_group_test(tenant_group_test): def test_update_tenant_group(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - resp, content = delete_tenant_group(self.tenant_group, + header = httplib2.Http(".cache") + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) + resp, content = util.delete_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups/%s' % (URL, self.tenant, self.tenant_group) + url = '%stenant/%s/groups/%s' % (util.URL, self.tenant, + self.tenant_group) data = '{"group": { "id":"%s","description": "A NEW description..." ,\ "tenantId":"%s" }}' % (self.tenant_group, self.tenant) #test for Content-Type = application/json - resp, content = h.request(url, "PUT", body=data, + resp, content = header.request(url, "PUT", body=data, headers={"Content-Type": "application/json", "X-Auth-Token": self.auth_token}) - body = json.loads(content) if int(resp['status']) == 500: self.fail('IDM fault') @@ -573,20 +571,21 @@ class update_tenant_group_test(tenant_group_test): self.assertEqual('A NEW description...', body['group']['description']) def test_update_tenant_group_xml(self): - h = httplib2.Http(".cache") - resp, content = delete_tenant(self.tenant, str(self.auth_token)) + header = httplib2.Http(".cache") + resp, content = util.delete_tenant(self.tenant, str(self.auth_token)) - resp, content = create_tenant(self.tenant, str(self.auth_token)) + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) - resp, content = delete_tenant_group(self.tenant_group, + resp, content = util.delete_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups/%s' % (URL, self.tenant , self.tenant_group) + url = '%stenant/%s/groups/%s' % (util.URL, self.tenant, + self.tenant_group) data = '<group xmlns="http://docs.openstack.org/idm/api/v1.0" \ tenantId="%s" id="%s"> \ @@ -594,12 +593,11 @@ class update_tenant_group_test(tenant_group_test): </group>' % (self.tenant, self.tenant_group) #test for Content-Type = application/json - resp, content = h.request(url, "PUT", body=data, + resp, content = header.request(url, "PUT", body=data, headers={"Content-Type": "application/xml", "X-Auth-Token": self.auth_token, "ACCEPT": "application/xml"}) - body = etree.fromstring(content) desc = body.find("{http://docs.openstack.org/idm/api/v1.0}description") @@ -613,20 +611,21 @@ class update_tenant_group_test(tenant_group_test): self.assertEqual('A NEW description...', desc.text) def test_update_tenant_group_bad(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - resp, content = delete_tenant_group(self.tenant_group, + header = httplib2.Http(".cache") + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) + resp, content = util.delete_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups/%s' % (URL, self.tenant, self.tenant_group) + url = '%stenant/%s/groups/%s' % (util.URL, self.tenant, + self.tenant_group) data = '{"group": { "description_bad": "A NEW description...",\ "id":"%s","tenantId":"%s" }}' % (self.tenant_group, self.tenant) #test for Content-Type = application/json - resp, content = h.request(url, "PUT", body=data, + resp, content = header.request(url, "PUT", body=data, headers={"Content-Type": "application/json", "X-Auth-Token": self.auth_token}) if int(resp['status']) == 500: @@ -636,22 +635,23 @@ class update_tenant_group_test(tenant_group_test): self.assertEqual(400, int(resp['status'])) def test_update_tenant_group_bad_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - resp, content = delete_tenant_group(self.tenant_group, + header = httplib2.Http(".cache") + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) + resp, content = util.delete_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups/%s' % (URL, self.tenant, self.tenant_group) + url = '%stenant/%s/groups/%s' % (util.URL, self.tenant, + self.tenant_group) data = '<?xml version="1.0" encoding="UTF-8"?> \ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ tenantId="%s" id="%s"> \ <description_bad>A NEW description...</description> \ </group>' % (self.tenant, self.tenant_group) #test for Content-Type = application/json - resp, content = h.request(url, "PUT", body=data, + resp, content = header.request(url, "PUT", body=data, headers={"Content-Type": "application/xml", "X-Auth-Token": self.auth_token, "ACCEPT": "application/xml"}) @@ -662,20 +662,20 @@ class update_tenant_group_test(tenant_group_test): self.assertEqual(400, int(resp['status'])) def test_update_tenant_group_not_found(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - resp, content = delete_tenant_group(self.tenant_group, + header = httplib2.Http(".cache") + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) + resp, content = util.delete_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups/NonexistingID' % (URL, self.tenant) + url = '%stenant/%s/groups/NonexistingID' % (util.URL, self.tenant) data = '{"group": { "description": "A NEW description...",\ "id":"NonexistingID", "tenantId"="test_tenant" }}' #test for Content-Type = application/json - resp, content = h.request(url, "GET", body=data, + resp, content = header.request(url, "GET", body=data, headers={"Content-Type": "application/json", "X-Auth-Token": self.auth_token}) if int(resp['status']) == 500: @@ -685,16 +685,16 @@ class update_tenant_group_test(tenant_group_test): self.assertEqual(404, int(resp['status'])) def test_update_tenant_group_not_found_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups/NonexistingID' % (URL, self.tenant) + header = httplib2.Http(".cache") + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/NonexistingID' % (util.URL, self.tenant) data = '<?xml version="1.0" encoding="UTF-8"?> \ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ id="NonexistingID", "tenant_id"="test_tenant"> \ <description_bad>A NEW description...</description> \ </group>' #test for Content-Type = application/json - resp, content = h.request(url, "GET", body=data, + resp, content = header.request(url, "GET", body=data, headers={"Content-Type": "application/xml", "X-Auth-Token": self.auth_token, "ACCEPT": "application/xml"}) @@ -708,103 +708,98 @@ class update_tenant_group_test(tenant_group_test): class delete_tenant_group_test(tenant_group_test): def test_delete_tenant_group_not_found(self): - #resp,content=create_tenant("test_tenant_delete", str(self.auth_token)) - resp, content = delete_tenant_group("test_tenant_delete111", + resp, content = util.delete_tenant_group("test_tenant_delete111", self.tenant, str(self.auth_token)) self.assertEqual(404, int(resp['status'])) def test_delete_tenant_group_not_found_xml(self): - #resp,content=create_tenant("test_tenant_delete", str(self.auth_token)) - resp, content = delete_tenant_group_xml("test_tenant_delete111", + resp, content = util.delete_tenant_group_xml("test_tenant_delete111", self.tenant, str(self.auth_token)) self.assertEqual(404, int(resp['status'])) def test_delete_tenant_group(self): - resp, content = create_tenant("test_tenant_delete", + resp, content = util.create_tenant("test_tenant_delete", str(self.auth_token)) - respG, contentG = create_tenant_group('test_tenant_group_delete', + resp, content = util.create_tenant_group('test_tenant_group_delete', "test_tenant_delete", str(self.auth_token)) - respG, contentG = delete_tenant_group('test_tenant_group_delete', + resp, content = util.delete_tenant_group('test_tenant_group_delete', "test_tenant_delete", str(self.auth_token)) - resp, content = delete_tenant("test_tenant_delete", + resp, content = util.delete_tenant("test_tenant_delete", str(self.auth_token)) - self.assertEqual(204, int(respG['status'])) + self.assertEqual(204, int(resp['status'])) def test_delete_tenant_group_xml(self): - resp, content = create_tenant("test_tenant_delete", + resp, content = util.create_tenant("test_tenant_delete", str(self.auth_token)) - respG, contentG = create_tenant_group('test_tenant_group_delete', + resp, content = util.create_tenant_group('test_tenant_group_delete', "test_tenant_delete", str(self.auth_token)) - respG, contentG = delete_tenant_group('test_tenant_group_delete', + resp, content = util.delete_tenant_group('test_tenant_group_delete', "test_tenant_delete", str(self.auth_token)) - resp, content = delete_tenant_xml("test_tenant_delete", + resp, content = util.delete_tenant_xml("test_tenant_delete", str(self.auth_token)) - self.assertEqual(204, int(respG['status'])) + self.assertEqual(204, int(resp['status'])) class add_user_tenant_group_test(tenant_group_test): def setUp(self): - self.token = get_token('joeuser', 'secrete', 'token') + self.token = util.get_token('joeuser', 'secrete', 'token') self.tenant = 'test_tenant' - self.user = get_user() - self.userdisabled = get_userdisabled() - self.auth_token = get_auth_token() - self.exp_auth_token = get_exp_auth_token() - self.disabled_token = get_disabled_token() + self.user = util.get_user() + self.userdisabled = util.get_userdisabled() + self.auth_token = util.get_auth_token() + self.exp_auth_token = util.get_exp_auth_token() + self.disabled_token = util.get_disabled_token() self.tenant_group = 'test_tenant_group_add' def tearDown(self): - respG, contentG = delete_user_tenant_group(self.tenant, + resp, content = util.delete_user_tenant_group(self.tenant, self.tenant_group, self.user, str(self.auth_token)) - respG, contentG = delete_user(self.tenant, self.user, + resp, content = util.delete_user(self.tenant, self.user, str(self.auth_token)) - resp, content = delete_tenant_group(self.tenant_group, + resp, content = util.delete_tenant_group(self.tenant_group, self.tenant, self.auth_token) - resp, content = delete_tenant(self.tenant, self.auth_token) - + resp, content = util.delete_tenant(self.tenant, self.auth_token) def test_add_user_tenant_group(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + resp, content = util.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_tenant_group(self.tenant, self.tenant_group, - self.user, str(self.auth_token) - ) + resp, content = util.add_user_tenant_group(self.tenant, + self.tenant_group, + self.user, + str(self.auth_token)) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') - if int(respG['status']) not in (200, 201): - self.fail('Failed due to %d' % int(respG['status'])) - + if int(resp['status']) not in (200, 201): + self.fail('Failed due to %d' % int(resp['status'])) def test_add_user_tenant_group_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, - self.tenant, - str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) + resp, content = util.create_tenant_group(self.tenant_group, + self.tenant, + str(self.auth_token)) + resp, content = util.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_tenant_group_xml(self.tenant, + resp, content = util.add_user_tenant_group_xml(self.tenant, self.tenant_group, self.user, str(self.auth_token)) @@ -813,403 +808,388 @@ class add_user_tenant_group_test(tenant_group_test): self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') - if int(respG['status']) not in (200, 201): - self.fail('Failed due to %d' % int(respG['status'])) - + if int(resp['status']) not in (200, 201): + self.fail('Failed due to %d' % int(resp['status'])) def test_add_user_tenant_group_conflict(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + resp, content = util.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_tenant_group(self.tenant, self.tenant_group, - self.user, str(self.auth_token) - ) - respG, contentG = add_user_tenant_group(self.tenant, self.tenant_group, - self.user, str(self.auth_token) - ) - + resp, content = util.add_user_tenant_group(self.tenant, + self.tenant_group, + self.user, + str(self.auth_token)) + resp, content = util.add_user_tenant_group(self.tenant, + self.tenant_group, + self.user, + str(self.auth_token)) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') - self.assertEqual(409, int(respG['status'])) + self.assertEqual(409, int(resp['status'])) def test_add_user_tenant_group_conflict_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + resp, content = util.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_tenant_group_xml(self.tenant, self.tenant_group, - self.user, str(self.auth_token) - ) - respG, contentG = add_user_tenant_group_xml(self.tenant, self.tenant_group, - self.user, str(self.auth_token) - ) + resp, content = util.add_user_tenant_group_xml(self.tenant, + self.tenant_group, + self.user, str(self.auth_token)) + resp, content = util.add_user_tenant_group_xml(self.tenant, + self.tenant_group, + self.user, + str(self.auth_token)) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') - self.assertEqual(409, int(respG['status'])) + self.assertEqual(409, int(resp['status'])) def test_add_user_tenant_group_unauthorized(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + resp, content = util.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_tenant_group(self.tenant, self.tenant_group, - self.user, self.token) + resp, content = util.add_user_tenant_group(self.tenant, + self.tenant_group, + self.user, self.token) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') - self.assertEqual(401, int(respG['status'])) + self.assertEqual(401, int(resp['status'])) def test_add_user_tenant_group_unauthorized_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, - str(self.auth_token)) + resp, content = util.create_user(self.tenant, self.user, + str(self.auth_token)) - respG, contentG = add_user_tenant_group_xml(self.tenant, self.tenant_group, - self.user, self.token) + resp, content = util.add_user_tenant_group_xml(self.tenant, + self.tenant_group, + self.user, self.token) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') - self.assertEqual(401, int(respG['status'])) + self.assertEqual(401, int(resp['status'])) def test_add_user_tenant_group_forbidden(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, - self.tenant, - str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) + resp, content = util.create_tenant_group(self.tenant_group, + self.tenant, + str(self.auth_token)) + resp, content = util.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_tenant_group(self.tenant, self.tenant_group, - self.user, self.disabled_token) + resp, content = util.add_user_tenant_group(self.tenant, + self.tenant_group, + self.user, + self.disabled_token) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') - self.assertEqual(403, int(respG['status'])) + self.assertEqual(403, int(resp['status'])) def test_add_user_tenant_group_forbidden_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + resp, content = util.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_tenant_group_xml(self.tenant, self.tenant_group, - self.user, self.disabled_token) + resp, content = util.add_user_tenant_group_xml(self.tenant, + self.tenant_group, + self.user, + self.disabled_token) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') - self.assertEqual(403, int(respG['status'])) + self.assertEqual(403, int(resp['status'])) class get_users_tenant_group_test(tenant_group_test): def setUp(self): - self.token = get_token('joeuser', 'secrete', 'token') + self.token = util.get_token('joeuser', 'secrete', 'token') self.tenant = 'test_tenant' - self.user = get_user() - self.userdisabled = get_userdisabled() - self.auth_token = get_auth_token() - self.exp_auth_token = get_exp_auth_token() - self.disabled_token = get_disabled_token() + self.user = util.get_user() + self.userdisabled = util.get_userdisabled() + self.auth_token = util.get_auth_token() + self.exp_auth_token = util.get_exp_auth_token() + self.disabled_token = util.get_disabled_token() self.tenant_group = 'test_tenant_group_add' def tearDown(self): - respG, contentG = delete_user_tenant_group(self.tenant, + resp, content = util.delete_user_tenant_group(self.tenant, self.tenant_group, self.user, str(self.auth_token)) - respG, contentG = delete_user(self.tenant, self.user, + resp, content = util.delete_user(self.tenant, self.user, str(self.auth_token)) - resp, content = delete_tenant_group(self.tenant_group, + resp, content = util.delete_tenant_group(self.tenant_group, self.tenant, self.auth_token) - resp, content = delete_tenant(self.tenant, self.auth_token) + resp, content = util.delete_tenant(self.tenant, self.auth_token) def test_get_users_tenant_group(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + resp, content = util.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_tenant_group(self.tenant, self.tenant_group, - self.user, str(self.auth_token) - ) - respG, contentG = get_user_tenant_group(self.tenant, self.tenant_group, - str(self.auth_token) - ) + resp, content = util.add_user_tenant_group(self.tenant, + self.tenant_group, + self.user, + str(self.auth_token)) + resp, content = util.get_user_tenant_group(self.tenant, + self.tenant_group, + str(self.auth_token)) + + self.assertEqual(200, int(resp['status'])) + def test_get_users_tenant_group_xml(self): + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') - self.assertEqual(200, int(respG['status'])) - - - def test_get_users_tenant_group_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + resp, content = util.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_tenant_group_xml(self.tenant, + resp, content = util.add_user_tenant_group_xml(self.tenant, self.tenant_group, self.user, str(self.auth_token)) - respG, contentG = get_user_tenant_group_xml(self.tenant, + resp, content = util.get_user_tenant_group_xml(self.tenant, self.tenant_group, - str(self.auth_token) - ) + str(self.auth_token)) + + self.assertEqual(200, int(resp['status'])) + + def test_get_users_tenant_group_unauthorized(self): + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') - self.assertEqual(200, int(respG['status'])) - - - def test_get_users_tenant_group_unauthorized(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + resp, content = util.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_tenant_group(self.tenant, self.tenant_group, - self.user, self.auth_token) + resp, content = util.add_user_tenant_group(self.tenant, + self.tenant_group, + self.user, + self.auth_token) - respG, contentG = get_user_tenant_group(self.tenant, self.tenant_group, - str(self.token) - ) + resp, content = util.get_user_tenant_group(self.tenant, + self.tenant_group, + str(self.token)) + self.assertEqual(401, int(resp['status'])) + def test_get_users_tenant_group_unauthorized_xml(self): + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') - self.assertEqual(401, int(respG['status'])) - def test_get_users_tenant_group_unauthorized_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + resp, content = util.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_tenant_group(self.tenant, self.tenant_group, - self.user, self.auth_token) - respG, contentG = get_user_tenant_group_xml(self.tenant, + resp, content = util.add_user_tenant_group(self.tenant, + self.tenant_group, + self.user, self.auth_token) + resp, content = util.get_user_tenant_group_xml(self.tenant, self.tenant_group, - str(self.token) - ) + str(self.token)) + self.assertEqual(401, int(resp['status'])) + def test_get_users_tenant_group_forbidden(self): + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') - self.assertEqual(401, int(respG['status'])) - - def test_get_users_tenant_group_forbidden(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + resp, content = util.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_tenant_group(self.tenant, self.tenant_group, - self.user, self.auth_token) - respG, contentG = get_user_tenant_group(self.tenant, + resp, content = util.add_user_tenant_group(self.tenant, + self.tenant_group, + self.user, self.auth_token) + resp, content = util.get_user_tenant_group(self.tenant, self.tenant_group, - str(self.disabled_token) - ) + str(self.disabled_token)) + + self.assertEqual(403, int(resp['status'])) + def test_get_users_tenant_group_forbidden_xml(self): + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') - self.assertEqual(403, int(respG['status'])) - - def test_get_users_tenant_group_forbidden_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + resp, content = util.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_tenant_group(self.tenant, self.tenant_group, - self.user, self.auth_token) - respG, contentG = get_user_tenant_group_xml(self.tenant, + resp, content = util.add_user_tenant_group(self.tenant, + self.tenant_group, + self.user, self.auth_token) + resp, content = util.get_user_tenant_group_xml(self.tenant, self.tenant_group, - str(self.disabled_token) - ) + str(self.disabled_token)) + + self.assertEqual(403, int(resp['status'])) + + def test_get_users_tenant_group_expired(self): + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') - self.assertEqual(403, int(respG['status'])) - - def test_get_users_tenant_group_expired(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + resp, content = util.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_tenant_group(self.tenant, self.tenant_group, - self.user, self.auth_token) - respG, contentG = get_user_tenant_group(self.tenant, self.tenant_group, - str(self.exp_auth_token) - ) + resp, content = util.add_user_tenant_group(self.tenant, + self.tenant_group, + self.user, self.auth_token) + resp, content = util.get_user_tenant_group(self.tenant, + self.tenant_group, + str(self.exp_auth_token)) + self.assertEqual(403, int(resp['status'])) + def test_get_users_tenant_group_expired_xml(self): + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') - self.assertEqual(403, int(respG['status'])) - - def test_get_users_tenant_group_expired_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + resp, content = util.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_tenant_group(self.tenant, self.tenant_group, + resp, content = util.add_user_tenant_group(self.tenant, + self.tenant_group, self.user, self.auth_token) - respG, contentG = get_user_tenant_group_xml(self.tenant, + resp, content = util.get_user_tenant_group_xml(self.tenant, self.tenant_group, - str(self.exp_auth_token) - ) - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(403, int(respG['status'])) + str(self.exp_auth_token)) + + self.assertEqual(403, int(resp['status'])) + class delete_users_tenant_group_test(tenant_group_test): def test_delete_user_tenant_group(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + resp, content = util.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_tenant_group(self.tenant, self.tenant_group, - self.user, str(self.auth_token) - ) - respG, contentG = delete_user_tenant_group(self.tenant, + resp, content = util.add_user_tenant_group(self.tenant, + self.tenant_group, + self.user, + str(self.auth_token)) + resp, content = util.delete_user_tenant_group(self.tenant, self.tenant_group, self.user, - str(self.auth_token) - ) + str(self.auth_token)) + self.assertEqual(204, int(resp['status'])) + + def test_delete_user_tenant_group_xml(self): + resp, content = util.create_tenant(self.tenant, str(self.auth_token)) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') - self.assertEqual(204, int(respG['status'])) - - - def test_delete_user_tenant_group_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group, + resp, content = util.create_tenant_group(self.tenant_group, self.tenant, str(self.auth_token)) - respG, contentG = create_user(self.tenant, self.user, + resp, content = util.create_user(self.tenant, self.user, str(self.auth_token)) - respG, contentG = add_user_tenant_group_xml(self.tenant, + resp, content = util.add_user_tenant_group_xml(self.tenant, self.tenant_group, self.user, str(self.auth_token)) - respG, contentG = delete_user_tenant_group_xml(self.tenant, + resp, content = util.delete_user_tenant_group_xml(self.tenant, self.tenant_group, self.user, str(self.auth_token)) - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(204, int(respG['status'])) + self.assertEqual(204, int(resp['status'])) def test_delete_user_tenant_group_notfound(self): - h = httplib2.Http(".cache") - - respG, contentG = delete_user_tenant_group(self.tenant, + resp, content = util.delete_user_tenant_group(self.tenant, self.tenant_group, 'NonExistinguser', - str(self.auth_token) - ) - - if int(respG['status']) == 500: + str(self.auth_token)) + if int(resp['status']) == 500: self.fail('IDM fault') - elif int(respG['status']) == 503: + elif int(resp['status']) == 503: self.fail('Service Not Available') - self.assertEqual(404, int(respG['status'])) + self.assertEqual(404, int(resp['status'])) def test_delete_user_tenant_group_notfound_xml(self): - h = httplib2.Http(".cache") - - respG, contentG = delete_user_tenant_group_xml(self.tenant, + resp, content = util.delete_user_tenant_group_xml(self.tenant, self.tenant_group, 'NonExistinguser', - str(self.auth_token) - ) - - if int(respG['status']) == 500: + str(self.auth_token)) + if int(resp['status']) == 500: self.fail('IDM fault') - elif int(respG['status']) == 503: + elif int(resp['status']) == 503: self.fail('Service Not Available') - self.assertEqual(404, int(respG['status'])) + self.assertEqual(404, int(resp['status'])) if __name__ == '__main__': unittest.main() diff --git a/test/unit/test_tenants.py b/test/unit/test_tenants.py index a4713c52..0489c195 100644 --- a/test/unit/test_tenants.py +++ b/test/unit/test_tenants.py @@ -4,36 +4,32 @@ import sys sys.path.append(os.path.abspath(os.path.join(os.path.abspath(__file__), '..', '..', '..', '..', 'keystone'))) import unittest -from webtest import TestApp import httplib2 import json from lxml import etree -import unittest -from webtest import TestApp -from test_common import * +import test_common as utils + class tenant_test(unittest.TestCase): def setUp(self): - self.token = get_token('joeuser', 'secrete', 'token') - self.tenant = get_tenant() - self.user = get_user() - self.userdisabled = get_userdisabled() - self.auth_token = get_auth_token() - self.exp_auth_token = get_exp_auth_token() - self.disabled_token = get_disabled_token() + self.token = utils.get_token('joeuser', 'secrete', 'token') + self.tenant = 'test_tenant' + self.user = utils.get_user() + self.userdisabled = utils.get_userdisabled() + self.auth_token = utils.get_auth_token() + self.exp_auth_token = utils.get_exp_auth_token() + self.disabled_token = utils.get_disabled_token() def tearDown(self): - resp, content = delete_tenant(self.tenant, self.auth_token) - + resp, content = utils.delete_tenant(self.tenant, self.auth_token) + class create_tenant_test(tenant_test): - - def test_tenant_create(self): - resp, content = delete_tenant('test_tenant', str(self.auth_token)) - resp, content = create_tenant('test_tenant', str(self.auth_token)) - self.tenant = 'test_tenant' + def test_tenant_create(self): + resp, content = utils.delete_tenant(self.tenant, str(self.auth_token)) + resp, content = utils.create_tenant(self.tenant, str(self.auth_token)) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: @@ -44,60 +40,56 @@ class create_tenant_test(tenant_test): self.fail('Failed due to %d' % int(resp['status'])) def test_tenant_create_xml(self): - resp, content = delete_tenant_xml('test_tenant', str(self.auth_token)) - resp, content = create_tenant_xml('test_tenant', str(self.auth_token)) - self.tenant = 'test_tenant' + resp, content = utils.delete_tenant_xml(self.tenant, + str(self.auth_token)) + resp, content = utils.create_tenant_xml(self.tenant, + str(self.auth_token)) content = etree.fromstring(content) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') - if int(resp['status']) not in (200, 201): - self.fail('Failed due to %d' % int(resp['status'])) def test_tenant_create_again(self): - resp, content = create_tenant("test_tenant", str(self.auth_token)) - resp, content = create_tenant("test_tenant", str(self.auth_token)) + resp, content = utils.create_tenant(self.tenant, + str(self.auth_token)) if int(resp['status']) == 200: self.tenant = content['tenant']['id'] + resp, content = utils.create_tenant(self.tenant, + str(self.auth_token)) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(409, int(resp['status'])) - if int(resp['status']) == 200: - self.tenant = content['tenant']['id'] def test_tenant_create_again_xml(self): - resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) - resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + resp, content = utils.create_tenant_xml(self.tenant, + str(self.auth_token)) + resp, content = utils.create_tenant_xml(self.tenant, + str(self.auth_token)) content = etree.fromstring(content) - if int(resp['status']) == 200: - self.tenant = content.get("id") - if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(409, int(resp['status'])) - if int(resp['status']) == 200: - self.tenant = content.get("id") def test_tenant_create_forbidden_token(self): - h = httplib2.Http(".cache") - resp, content = create_tenant("test_tenant", str(self.auth_token)) + header = httplib2.Http(".cache") + resp, content = utils.create_tenant(self.tenant, str(self.auth_token)) if int(resp['status']) == 200: self.tenant = content['tenant']['id'] - url = '%stenants' % (URL) + url = '%stenants' % (utils.URL) body = {"tenant": {"id": self.tenant, "description": "A description ...", "enabled": True}} - resp, content = h.request(url, "POST", body=json.dumps(body), + resp, content = header.request(url, "POST", body=json.dumps(body), headers={"Content-Type": "application/json", "X-Auth-Token": self.token}) @@ -108,19 +100,20 @@ class create_tenant_test(tenant_test): self.assertEqual(401, int(resp['status'])) def test_tenant_create_forbidden_token_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + header = httplib2.Http(".cache") + resp, content = utils.create_tenant_xml(self.tenant, + str(self.auth_token)) content = etree.fromstring(content) if int(resp['status']) == 200: self.tenant = content.get('id') - url = '%stenants' % (URL) + url = '%stenants' % (utils.URL) body = '<?xml version="1.0" encoding="UTF-8"?> \ <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \ enabled="true" id="%s"> \ <description>A description...</description> \ </tenant>' % self.tenant - resp, content = h.request(url, "POST", body=body, + resp, content = header.request(url, "POST", body=body, headers={"Content-Type": "application/xml", "X-Auth-Token": self.token, "ACCEPT": "application/xml"}) @@ -132,19 +125,18 @@ class create_tenant_test(tenant_test): self.assertEqual(401, int(resp['status'])) def test_tenant_create_expired_token(self): - h = httplib2.Http(".cache") - resp, content = create_tenant("test_tenant", str(self.auth_token)) + header = httplib2.Http(".cache") + resp, content = utils.create_tenant(self.tenant, + str(self.auth_token)) if int(resp['status']) == 200: self.tenant = content['tenant']['id'] - - url = '%stenants' % (URL) + url = '%stenants' % (utils.URL) body = {"tenant": {"id": self.tenant, "description": "A description ...", "enabled": True}} - resp, content = h.request(url, "POST", body=json.dumps(body), + resp, content = header.request(url, "POST", body=json.dumps(body), headers={"Content-Type": "application/json", "X-Auth-Token": self.exp_auth_token}) - if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: @@ -152,20 +144,21 @@ class create_tenant_test(tenant_test): self.assertEqual(403, int(resp['status'])) def test_tenant_create_expired_token_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + header = httplib2.Http(".cache") + resp, content = utils.create_tenant_xml(self.tenant, + str(self.auth_token)) content = etree.fromstring(content) if int(resp['status']) == 200: self.tenant = content.get('id') - url = '%stenants' % (URL) + url = '%stenants' % (utils.URL) body = '<?xml version="1.0" encoding="UTF-8"?> \ <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \ enabled="true" id="%s"> \ <description>A description...</description> \ </tenant>' % self.tenant - resp, content = h.request(url, "POST", body=body, + resp, content = header.request(url, "POST", body=body, headers={"Content-Type": "application/xml", "X-Auth-Token": self.exp_auth_token, "ACCEPT": "application/xml"}) @@ -177,16 +170,17 @@ class create_tenant_test(tenant_test): self.assertEqual(403, int(resp['status'])) def test_tenant_create_missing_token(self): - h = httplib2.Http(".cache") - resp, content = create_tenant("test_tenant", str(self.auth_token)) + header = httplib2.Http(".cache") + resp, content = utils.create_tenant(self.tenant, + str(self.auth_token)) if int(resp['status']) == 200: self.tenant = content['tenant']['id'] - url = '%stenants' % (URL) + url = '%stenants' % (utils.URL) body = {"tenant": {"id": self.tenant, "description": "A description ...", "enabled": True}} - resp, content = h.request(url, "POST", body=json.dumps(body), + resp, content = header.request(url, "POST", body=json.dumps(body), headers={"Content-Type": "application/json"}) if int(resp['status']) == 500: @@ -196,23 +190,21 @@ class create_tenant_test(tenant_test): self.assertEqual(401, int(resp['status'])) def test_tenant_create_missing_token_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + header = httplib2.Http(".cache") + resp, content = utils.create_tenant_xml(self.tenant, + str(self.auth_token)) content = etree.fromstring(content) if int(resp['status']) == 200: self.tenant = content.get('id') - - url = '%stenants' % (URL) - + url = '%stenants' % (utils.URL) body = '<?xml version="1.0" encoding="UTF-8"?> \ <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \ enabled="true" id="%s"> \ <description>A description...</description> \ </tenant>' % self.tenant - resp, content = h.request(url, "POST", body=body, + resp, content = header.request(url, "POST", body=body, headers={"Content-Type": "application/xml", "ACCEPT": "application/xml"}) - if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: @@ -220,19 +212,19 @@ class create_tenant_test(tenant_test): self.assertEqual(401, int(resp['status'])) def test_tenant_create_disabled_token(self): - h = httplib2.Http(".cache") - resp, content = create_tenant("test_tenant", str(self.auth_token)) + header = httplib2.Http(".cache") + resp, content = utils.create_tenant(self.tenant, + str(self.auth_token)) if int(resp['status']) == 200: self.tenant = content['tenant']['id'] - url = '%stenants' % (URL) + url = '%stenants' % (utils.URL) body = '{"tenant": { "id": "%s", \ "description": "A description ...", "enabled"\ :true } }' % self.tenant - resp, content = h.request(url, "POST", body=body, + resp, content = header.request(url, "POST", body=body, headers={"Content-Type": "application/json", - "X-Auth-Token": self.disabled_token - }) + "X-Auth-Token": self.disabled_token}) if int(resp['status']) == 500: self.fail('IDM fault') @@ -241,23 +233,22 @@ class create_tenant_test(tenant_test): self.assertEqual(403, int(resp['status'])) def test_tenant_create_disabled_token_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + header = httplib2.Http(".cache") + resp, content = utils.create_tenant_xml(self.tenant, + str(self.auth_token)) content = etree.fromstring(content) if int(resp['status']) == 200: self.tenant = content.get('id') - - url = '%stenants' % (URL) + url = '%stenants' % (utils.URL) body = '<?xml version="1.0" encoding="UTF-8"?> \ <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \ enabled="true" id="%s"> \ <description>A description...</description> \ </tenant>' % self.tenant - resp, content = h.request(url, "POST", body=body, + resp, content = header.request(url, "POST", body=body, headers={"Content-Type": "application/xml", "X-Auth-Token": self.disabled_token, "ACCEPT": "application/xml"}) - if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: @@ -265,16 +256,17 @@ class create_tenant_test(tenant_test): self.assertEqual(403, int(resp['status'])) def test_tenant_create_invalid_token(self): - h = httplib2.Http(".cache") - resp, content = create_tenant("test_tenant", str(self.auth_token)) + header = httplib2.Http(".cache") + resp, content = utils.create_tenant(self.tenant, + str(self.auth_token)) if int(resp['status']) == 200: self.tenant = content['tenant']['id'] - url = '%stenants' % (URL) + url = '%stenants' % (utils.URL) body = '{"tenant": { "id": "%s", \ "description": "A description ...", "enabled"\ :true } }' % self.tenant - resp, content = h.request(url, "POST", body=body, + resp, content = header.request(url, "POST", body=body, headers={"Content-Type": "application/json", "X-Auth-Token": 'nonexsitingtoken'}) @@ -285,19 +277,20 @@ class create_tenant_test(tenant_test): self.assertEqual(404, int(resp['status'])) def test_tenant_create_invalid_token_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + header = httplib2.Http(".cache") + resp, content = utils.create_tenant_xml(self.tenant, + str(self.auth_token)) content = etree.fromstring(content) if int(resp['status']) == 200: self.tenant = content.get('id') - url = '%stenants' % (URL) + url = '%stenants' % (utils.URL) body = '<?xml version="1.0" encoding="UTF-8"?> \ <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \ enabled="true" id="%s"> \ <description>A description...</description> \ </tenant>' % self.tenant - resp, content = h.request(url, "POST", body=body, + resp, content = header.request(url, "POST", body=body, headers={"Content-Type": "application/xml", "X-Auth-Token": 'nonexsitingtoken', "ACCEPT": "application/xml"}) @@ -312,14 +305,13 @@ class create_tenant_test(tenant_test): class get_tenants_test(tenant_test): def test_get_tenants(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - url = '%stenants' % (URL) + header = httplib2.Http(".cache") + resp, content = utils.create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants' % (utils.URL) #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='{}', + resp, content = header.request(url, "GET", body='{}', headers={"Content-Type": "application/json", - "X-Auth-Token": self.auth_token - }) + "X-Auth-Token": self.auth_token}) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: @@ -327,11 +319,11 @@ class get_tenants_test(tenant_test): self.assertEqual(200, int(resp['status'])) def test_get_tenants_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - url = '%stenants' % (URL) + header = httplib2.Http(".cache") + resp, content = utils.create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants' % (utils.URL) #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='', + resp, content = header.request(url, "GET", body='', headers={"Content-Type": "application/xml", "X-Auth-Token": self.auth_token, "ACCEPT": "application/xml"}) @@ -342,11 +334,11 @@ class get_tenants_test(tenant_test): self.assertEqual(200, int(resp['status'])) def test_get_tenants_unauthorized_token(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - url = '%stenants' % (URL) + header = httplib2.Http(".cache") + resp, content = utils.create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants' % (utils.URL) #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='{}', + resp, content = header.request(url, "GET", body='{}', headers={"Content-Type": "application/json", "X-Auth-Token": self.token}) if int(resp['status']) == 500: @@ -356,11 +348,11 @@ class get_tenants_test(tenant_test): self.assertEqual(401, int(resp['status'])) def test_get_tenants_unauthorized_token_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - url = '%stenants' % (URL) + header = httplib2.Http(".cache") + resp, content = utils.create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants' % (utils.URL) #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='', + resp, content = header.request(url, "GET", body='', headers={"Content-Type": "application/xml", "X-Auth-Token": self.token, "ACCEPT": "application/xml"}) @@ -371,14 +363,13 @@ class get_tenants_test(tenant_test): self.assertEqual(401, int(resp['status'])) def test_get_tenants_exp_token(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - url = '%stenants' % (URL) + header = httplib2.Http(".cache") + resp, content = utils.create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants' % (utils.URL) #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='{}', + resp, content = header.request(url, "GET", body='{}', headers={"Content-Type": "application/json", - "X-Auth-Token": self.exp_auth_token - }) + "X-Auth-Token": self.exp_auth_token}) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: @@ -386,11 +377,11 @@ class get_tenants_test(tenant_test): self.assertEqual(403, int(resp['status'])) def test_get_tenants_exp_token_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - url = '%stenants' % (URL) + header = httplib2.Http(".cache") + resp, content = utils.create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants' % (utils.URL) #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='', + resp, content = header.request(url, "GET", body='', headers={"Content-Type": "application/xml", "X-Auth-Token": self.exp_auth_token, "ACCEPT": "application/xml"}) @@ -404,11 +395,11 @@ class get_tenants_test(tenant_test): class get_tenant_test(tenant_test): def test_get_tenant(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - url = '%stenants/%s' % (URL, self.tenant) + header = httplib2.Http(".cache") + resp, content = utils.create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/%s' % (utils.URL, self.tenant) #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='{}', + resp, content = header.request(url, "GET", body='{}', headers={"Content-Type": "application/json", "X-Auth-Token": self.auth_token}) if int(resp['status']) == 500: @@ -418,11 +409,11 @@ class get_tenant_test(tenant_test): self.assertEqual(200, int(resp['status'])) def test_get_tenant_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - url = '%stenants/%s' % (URL, self.tenant) + header = httplib2.Http(".cache") + resp, content = utils.create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/%s' % (utils.URL, self.tenant) #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='', + resp, content = header.request(url, "GET", body='', headers={"Content-Type": "application/xml", "X-Auth-Token": self.auth_token, "ACCEPT": "application/xml"}) @@ -433,11 +424,11 @@ class get_tenant_test(tenant_test): self.assertEqual(200, int(resp['status'])) def test_get_tenant_bad(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - url = '%stenants/%s' % (URL, 'tenant_bad') + header = httplib2.Http(".cache") + resp, content = utils.create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/%s' % (utils.URL, 'tenant_bad') #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='{', + resp, content = header.request(url, "GET", body='', headers={"Content-Type": "application/json", "X-Auth-Token": self.auth_token}) if int(resp['status']) == 500: @@ -447,11 +438,11 @@ class get_tenant_test(tenant_test): self.assertEqual(404, int(resp['status'])) def test_get_tenant_bad_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - url = '%stenants/%s' % (URL, 'tenant_bad') + header = httplib2.Http(".cache") + resp, content = utils.create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/%s' % (utils.URL, 'tenant_bad') #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='{', + resp, content = header.request(url, "GET", body='', headers={"Content-Type": "application/xml", "X-Auth-Token": self.auth_token, "ACCEPT": "application/xml"}) @@ -462,11 +453,11 @@ class get_tenant_test(tenant_test): self.assertEqual(404, int(resp['status'])) def test_get_tenant_not_found(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - url = '%stenants/NonexistingID' % (URL) + header = httplib2.Http(".cache") + resp, content = utils.create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/NonexistingID' % (utils.URL) #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='{}', + resp, content = header.request(url, "GET", body='{}', headers={"Content-Type": "application/json", "X-Auth-Token": self.auth_token}) if int(resp['status']) == 500: @@ -476,11 +467,11 @@ class get_tenant_test(tenant_test): self.assertEqual(404, int(resp['status'])) def test_get_tenant_not_found_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - url = '%stenants/NonexistingID' % (URL) + header = httplib2.Http(".cache") + resp, content = utils.create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/NonexistingID' % (utils.URL) #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='', + resp, content = header.request(url, "GET", body='', headers={"Content-Type": "application/xml", "X-Auth-Token": self.auth_token, "ACCEPT": "application/xml"}) @@ -494,13 +485,13 @@ class get_tenant_test(tenant_test): class update_tenant_test(tenant_test): def test_update_tenant(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - url = '%stenants/%s' % (URL, self.tenant) + header = httplib2.Http(".cache") + resp, content = utils.create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/%s' % (utils.URL, self.tenant) data = '{"tenant": { "description": "A NEW description..." ,\ "enabled":true }}' #test for Content-Type = application/json - resp, content = h.request(url, "PUT", body=data, + resp, content = header.request(url, "PUT", body=data, headers={"Content-Type": "application/json", "X-Auth-Token": self.auth_token}) body = json.loads(content) @@ -509,13 +500,14 @@ class update_tenant_test(tenant_test): elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(200, int(resp['status'])) - self.assertEqual(int(self.tenant), int(body['tenant']['id'])) + self.assertEqual(self.tenant, body['tenant']['id']) self.assertEqual('A NEW description...', body['tenant']['description']) def test_update_tenant_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant_xml(self.tenant, str(self.auth_token)) - url = '%stenants/%s' % (URL, self.tenant) + header = httplib2.Http(".cache") + resp, content = utils.create_tenant_xml(self.tenant, + str(self.auth_token)) + url = '%stenants/%s' % (utils.URL, self.tenant) data = '<?xml version="1.0" encoding="UTF-8"?> \ <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \ enabled="true"> \ @@ -523,7 +515,7 @@ class update_tenant_test(tenant_test): </tenant>' #test for Content-Type = application/json - resp, content = h.request(url, "PUT", body=data, + resp, content = header.request(url, "PUT", body=data, headers={"Content-Type": "application/xml", "X-Auth-Token": self.auth_token, "ACCEPT": "application/xml"}) @@ -534,18 +526,18 @@ class update_tenant_test(tenant_test): elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(200, int(resp['status'])) - self.assertEqual(int(self.tenant), int(body.get('id'))) + self.assertEqual(self.tenant, body.get('id')) self.assertEqual('A NEW description...', desc.text) def test_update_tenant_bad(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - url = '%stenants/%s' % (URL, self.tenant) + header = httplib2.Http(".cache") + resp, content = utils.create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/%s' % (utils.URL, self.tenant) data = '{"tenant": { "description_bad": "A NEW description...",\ "enabled":true }}' #test for Content-Type = application/json - resp, content = h.request(url, "PUT", body=data, + resp, content = header.request(url, "PUT", body=data, headers={"Content-Type": "application/json", "X-Auth-Token": self.auth_token}) if int(resp['status']) == 500: @@ -555,16 +547,16 @@ class update_tenant_test(tenant_test): self.assertEqual(400, int(resp['status'])) def test_update_tenant_bad_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - url = '%stenants/%s' % (URL, self.tenant) + header = httplib2.Http(".cache") + resp, content = utils.create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/%s' % (utils.URL, self.tenant) data = '<?xml version="1.0" encoding="UTF-8"?> \ <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \ enabled="true"> \ <description_bad>A NEW description...</description> \ </tenant>' #test for Content-Type = application/json - resp, content = h.request(url, "PUT", body=data, + resp, content = header.request(url, "PUT", body=data, headers={"Content-Type": "application/xml", "X-Auth-Token": self.auth_token, "ACCEPT": "application/xml"}) @@ -575,13 +567,13 @@ class update_tenant_test(tenant_test): self.assertEqual(400, int(resp['status'])) def test_update_tenant_not_found(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - url = '%stenants/NonexistingID' % (URL) + header = httplib2.Http(".cache") + resp, content = utils.create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/NonexistingID' % (utils.URL) data = '{"tenant": { "description": "A NEW description...",\ "enabled":true }}' #test for Content-Type = application/json - resp, content = h.request(url, "GET", body=data, + resp, content = header.request(url, "GET", body=data, headers={"Content-Type": "application/json", "X-Auth-Token": self.auth_token}) if int(resp['status']) == 500: @@ -591,16 +583,16 @@ class update_tenant_test(tenant_test): self.assertEqual(404, int(resp['status'])) def test_update_tenant_not_found_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - url = '%stenants/NonexistingID' % (URL) + header = httplib2.Http(".cache") + resp, content = utils.create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/NonexistingID' % (utils.URL) data = '<?xml version="1.0" encoding="UTF-8"?> \ <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \ enabled="true"> \ <description_bad>A NEW description...</description> \ </tenant>' #test for Content-Type = application/json - resp, content = h.request(url, "GET", body=data, + resp, content = header.request(url, "GET", body=data, headers={"Content-Type": "application/xml", "X-Auth-Token": self.auth_token, "ACCEPT": "application/xml"}) @@ -614,30 +606,32 @@ class update_tenant_test(tenant_test): class delete_tenant_test(tenant_test): def test_delete_tenant_not_found(self): - #resp,content=create_tenant("test_tenant_delete", str(self.auth_token)) - resp, content = delete_tenant("test_tenant_delete111", + #resp,content=utils.create_tenant("test_tenant_delete", + # str(self.auth_token)) + resp, content = utils.delete_tenant("test_tenant_delete111", str(self.auth_token)) self.assertEqual(404, int(resp['status'])) def test_delete_tenant_not_found_xml(self): - #resp,content=create_tenant("test_tenant_delete", str(self.auth_token)) - resp, content = delete_tenant_xml("test_tenant_delete111", + #resp,content=utils.create_tenant("test_tenant_delete", + # str(self.auth_token)) + resp, content = utils.delete_tenant_xml("test_tenant_delete111", str(self.auth_token)) self.assertEqual(404, int(resp['status'])) def test_delete_tenant(self): - resp, content = create_tenant("test_tenant_delete", + resp, content = utils.create_tenant("test_tenant_delete", str(self.auth_token)) - resp, content = delete_tenant("test_tenant_delete", + resp, content = utils.delete_tenant("test_tenant_delete", str(self.auth_token)) self.assertEqual(204, int(resp['status'])) def test_delete_tenant_xml(self): - resp, content = create_tenant_xml("test_tenant_delete", + resp, content = utils.create_tenant_xml("test_tenant_delete", str(self.auth_token)) - resp, content = delete_tenant_xml("test_tenant_delete", + resp, content = utils.delete_tenant_xml("test_tenant_delete", str(self.auth_token)) self.assertEqual(204, int(resp['status'])) if __name__ == '__main__': - unittest.main()
\ No newline at end of file + unittest.main() diff --git a/test/unit/test_token.py b/test/unit/test_token.py index c29061c2..ce33c8f1 100644 --- a/test/unit/test_token.py +++ b/test/unit/test_token.py @@ -4,34 +4,29 @@ import sys sys.path.append(os.path.abspath(os.path.join(os.path.abspath(__file__), '..', '..', '..', '..', 'keystone'))) import unittest -from webtest import TestApp import httplib2 -import json -from lxml import etree -import unittest -from webtest import TestApp -from test_common import * +import test_common as utils class validate_token(unittest.TestCase): def setUp(self): - self.token = get_token('joeuser', 'secrete', 'token') - self.tenant = get_tenant() - self.user = get_user() - self.userdisabled = get_userdisabled() - self.auth_token = get_auth_token() - self.exp_auth_token = get_exp_auth_token() - self.disabled_token = get_disabled_token() + self.token = utils.get_token('joeuser', 'secrete', 'token') + self.tenant = utils.get_tenant() + self.user = utils.get_user() + self.userdisabled = utils.get_userdisabled() + self.auth_token = utils.get_auth_token() + self.exp_auth_token = utils.get_exp_auth_token() + self.disabled_token = utils.get_disabled_token() def tearDown(self): - delete_token(self.token, self.auth_token) + utils.delete_token(self.token, self.auth_token) def test_validate_token_true(self): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") - url = '%stoken/%s?belongsTo=%s' % (URL, self.token, self.tenant) - resp, content = h.request(url, "GET", body='', + url = '%stoken/%s?belongsTo=%s' % (utils.URL, self.token, self.tenant) + resp, content = header.request(url, "GET", body='', headers={"Content-Type": "application/json", "X-Auth-Token": self.auth_token}) if int(resp['status']) == 500: @@ -39,12 +34,12 @@ class validate_token(unittest.TestCase): elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(200, int(resp['status'])) - self.assertEqual('application/json', content_type(resp)) + self.assertEqual('application/json', utils.content_type(resp)) def test_validate_token_true_xml(self): - h = httplib2.Http(".cache") - url = '%stoken/%s?belongsTo=%s' % (URL, self.token, self.tenant) - resp, content = h.request(url, "GET", body='', + header = httplib2.Http(".cache") + url = '%stoken/%s?belongsTo=%s' % (utils.URL, self.token, self.tenant) + resp, content = header.request(url, "GET", body='', headers={"Content-Type": "application/xml", "X-Auth-Token": self.auth_token, "ACCEPT": "application/xml"}) @@ -53,29 +48,28 @@ class validate_token(unittest.TestCase): elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(200, int(resp['status'])) - self.assertEqual('application/xml', content_type(resp)) + self.assertEqual('application/xml', utils.content_type(resp)) def test_validate_token_expired(self): - h = httplib2.Http(".cache") - url = '%stoken/%s?belongsTo=%s' % (URL, self.exp_auth_token, + header = httplib2.Http(".cache") + url = '%stoken/%s?belongsTo=%s' % (utils.URL, self.exp_auth_token, self.tenant) - resp, content = h.request(url, "GET", body='', + resp, content = header.request(url, "GET", body='', headers={"Content-Type": "application/json", - "X-Auth-Token": self.exp_auth_token} - ) + "X-Auth-Token": self.exp_auth_token}) if int(resp['status']) == 500: self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(403, int(resp['status'])) - self.assertEqual('application/json', content_type(resp)) + self.assertEqual('application/json', utils.content_type(resp)) def test_validate_token_expired_xml(self): - h = httplib2.Http(".cache") + header = httplib2.Http(".cache") - url = '%stoken/%s?belongsTo=%s' % (URL, self.exp_auth_token, + url = '%stoken/%s?belongsTo=%s' % (utils.URL, self.exp_auth_token, self.tenant) - resp, content = h.request(url, "GET", body='', + resp, content = header.request(url, "GET", body='', headers={"Content-Type": "application/xml", "X-Auth-Token": self.exp_auth_token, "ACCEPT": "application/xml"}) @@ -84,13 +78,13 @@ class validate_token(unittest.TestCase): elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(403, int(resp['status'])) - self.assertEqual('application/xml', content_type(resp)) + self.assertEqual('application/xml', utils.content_type(resp)) def test_validate_token_invalid(self): - h = httplib2.Http(".cache") - url = '%stoken/%s?belongsTo=%s' % (URL, 'NonExistingToken', + header = httplib2.Http(".cache") + url = '%stoken/%s?belongsTo=%s' % (utils.URL, 'NonExistingToken', self.tenant) - resp, content = h.request(url, "GET", body='', + resp, content = header.request(url, "GET", body='', headers={"Content-Type": "application/json", "X-Auth-Token": self.auth_token}) @@ -99,13 +93,13 @@ class validate_token(unittest.TestCase): elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(401, int(resp['status'])) - self.assertEqual('application/json', content_type(resp)) + self.assertEqual('application/json', utils.content_type(resp)) def test_validate_token_invalid_xml(self): - h = httplib2.Http(".cache") - url = '%stoken/%s?belongsTo=%s' % (URL, 'NonExistingToken', + header = httplib2.Http(".cache") + url = '%stoken/%s?belongsTo=%s' % (utils.URL, 'NonExistingToken', self.tenant) - resp, content = h.request(url, "GET", body='', + resp, content = header.request(url, "GET", body='', headers={"Content-Type": "application/json", "X-Auth-Token": self.auth_token}) if int(resp['status']) == 500: @@ -113,7 +107,7 @@ class validate_token(unittest.TestCase): elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(401, int(resp['status'])) - self.assertEqual('application/json', content_type(resp)) + self.assertEqual('application/json', utils.content_type(resp)) if __name__ == '__main__': unittest.main() diff --git a/test/unit/test_version.py b/test/unit/test_version.py index b5e536a5..76b013ab 100644 --- a/test/unit/test_version.py +++ b/test/unit/test_version.py @@ -4,13 +4,9 @@ import sys sys.path.append(os.path.abspath(os.path.join(os.path.abspath(__file__), '..', '..', '..', '..', 'keystone'))) import unittest -from webtest import TestApp import httplib2 -import json -from lxml import etree -import unittest -from webtest import TestApp -from test_common import * +import test_common as utils + class version_test(unittest.TestCase): @@ -18,22 +14,19 @@ class version_test(unittest.TestCase): #here to call below method will call as last test case def test_a_get_version_json(self): - h = httplib2.Http(".cache") - url = URL - resp, content = h.request(url, "GET", body="", - headers={"Content-Type":"application/json"}) + header = httplib2.Http(".cache") + resp, content = header.request(utils.URL, "GET", body="", + headers={"Content-Type": "application/json"}) self.assertEqual(200, int(resp['status'])) - self.assertEqual('application/json', content_type(resp)) + self.assertEqual('application/json', utils.content_type(resp)) def test_a_get_version_xml(self): - h = httplib2.Http(".cache") - url = URL - resp, content = h.request(url, "GET", body="", + header = httplib2.Http(".cache") + resp, content = header.request(utils.URL, "GET", body="", headers={"Content-Type": "application/xml", "ACCEPT": "application/xml"}) - self.assertEqual(200, int(resp['status'])) - self.assertEqual('application/xml', content_type(resp)) - + self.assertEqual('application/xml', utils.content_type(resp)) + if __name__ == '__main__': unittest.main() |
