summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjabdul <abdulkader.j@hcl.com>2011-05-12 20:28:40 +0530
committerjabdul <abdulkader.j@hcl.com>2011-05-12 20:28:40 +0530
commit921e981a1e18bb45033b7565026f58cb3ea7293e (patch)
tree6ed36be16f26b89f827a03cb50e5e9bae365b99d
parentb96d1716ff0057dcfe4905fef6d598160a23157c (diff)
parent1b4fc952f27a292a3f164e42b17d81b1be8bb596 (diff)
Merge branch test_users and test_common
Conflicts: test/unit/test_common.py test/unit/test_users.py
-rw-r--r--README72
-rw-r--r--README.md113
-rwxr-xr-xecho/bin/echod26
-rw-r--r--echo/echo/__init__.py18
-rw-r--r--echo/echo/server.py (renamed from echo/echo/echo.py)5
-rw-r--r--keystone/db/sqlalchemy/api.py7
-rw-r--r--keystone/logic/service.py33
-rw-r--r--setup.py2
-rw-r--r--test/unit/test_authentication.py64
-rw-r--r--test/unit/test_common.py184
-rw-r--r--test/unit/test_exthandler.py49
-rw-r--r--test/unit/test_groups.py662
-rw-r--r--test/unit/test_keystone.py24
-rw-r--r--test/unit/test_tenant_groups.py888
-rw-r--r--test/unit/test_tenants.py334
-rw-r--r--test/unit/test_token.py74
-rw-r--r--test/unit/test_version.py27
17 files changed, 1357 insertions, 1225 deletions
diff --git a/README b/README
index 0a7ce0dc..2dd74d01 100644
--- a/README
+++ b/README
@@ -1,3 +1,4 @@
+
Keystone: Identity Service
==========================
@@ -60,13 +61,22 @@ Or using pip:
RUNNING KEYSTONE:
-----------------
-From the topdir
+From the topdir
+
+ $ bin/keystone-control --pid-file pidfile auth <start|stop|restart>
+
+config.py takes the config file from topdir/etc/keystone.conf
- $ bin/keystone-control --config-file etc/keystone.conf --pid-file=pidfile auth <start|stop|restart>
+If the keystone package is also intalled on the system
+/etc/keystone.conf or /etc/keystone/keystone.conf has higher priority
+than <top_dir>/etc/keystone.conf. If you are also doing development on a
+system that has keystone.conf installed in /etc/you need to disambiguate it by
-The "start" command invokes bin/keystone-auth. During development you can also run
+ $ bin/keystone-control --confg-file etc/keystone.conf --pid-file pidfile auth <start|stop|restart>
- $ bin/keystone-auth etc/keystone.conf
+Also, keystone-control calls keystone-auth and it need to be in the PATH
+
+ $ export PATH=<top_dir>/bin:$PATH
@@ -90,7 +100,6 @@ DEMO CLIENT:
$ cd echo/echo
$ python echo_client.py
-
INSTALLING KEYSTONE:
--------------------
@@ -98,11 +107,6 @@ INSTALLING KEYSTONE:
$ sudo python setup.py install
-RUNNING KEYSTONE (Eventlet Server):
------------------------------------
- sudo keystone (start|stop|restart)
-
-
INSTALLING TEST SERVICE:
------------------------
@@ -124,16 +128,11 @@ To clean the test database
$ sqlite3 keystone/keystone.db < test/kill.sql
-To run unit tests:
-
- $ python test/unit/test_identity.py
-
To run client demo (with all auth middleware running locally on sample service):
$ python echo/echo/echo.py
$ python echo/echo/echo_client.py
-
To perform contract validation and load testing, use SoapUI (for now).
Using SOAPUI:
@@ -152,45 +151,15 @@ Unit Test on Identity Services
In order to run the unit test on identity services start the auth sever
$ cd test/unit
- $ ../../bin/keystone-control --config-file ../..etc/keystone.conf --pid-file=pidfile auth start
+ $ ../../bin/keystone-auth
-Once the Identity service is running, go to unit test/unit directory
+There are 8 groups of tests. They can be run individually or as an entire colection. To run the entire test suite run
- $ python test_identity.py
+ $ python test_keystone
-You can run a sbuset of tests the following way
- $ grep class test_identity.py
-
-You get something like
-
-
-class identity_test(unittest.TestCase):
-class authorize_test(identity_test):
-class validate_token(authorize_test):
-class tenant_test(unittest.TestCase):
-class create_tenant_test(tenant_test):
-class get_tenants_test(tenant_test):
-class get_tenant_test(tenant_test):
-class update_tenant_test(tenant_test):
-class delete_tenant_test(tenant_test):
-class tenant_group_test(unittest.TestCase):
-class create_tenant_group_test(tenant_group_test):
-class get_tenant_groups_test(tenant_group_test):
-class get_tenant_group_test(tenant_group_test):
-class update_tenant_group_test(tenant_group_test):
-class delete_tenant_group_test(tenant_test):
-class global_group_test(unittest.TestCase):
-class create_global_group_test(global_group_test):
-class create_tenant_group_test(tenant_group_test):
-
-You can choose any class you like to test
-
- $ python test_identity.py delete_tenant_test
-
-For more on unit testing please refer
-
- $ python test_identity --help
+A test can also be run individually e.g.
+ $ python test_token
DATABASE SCHEMA
@@ -205,6 +174,3 @@ DATABASE SCHEMA
-
-
-
diff --git a/README.md b/README.md
new file mode 100644
index 00000000..0fb7c64e
--- /dev/null
+++ b/README.md
@@ -0,0 +1,113 @@
+Keystone: Identity Service
+==========================
+
+Keystone is a proposed independent authentication service for [OpenStack](http://www.openstack.org).
+
+This initial proof of concept aims to address the current use cases in Swift and Nova which are:
+
+* REST-based, token auth for Swift
+* many-to-many relationship between identity and tenant for Nova.
+
+
+SERVICES:
+---------
+
+* Keystone - authentication service
+* Auth_Token - WSGI middleware that can be used to handle token auth protocol (WSGI or remote proxy)
+* Echo - A sample service that responds by returning call details
+
+Also included:
+
+* Auth_Basic - Stub for WSGI middleware that will be used to handle basic auth
+* Auth_OpenID - Stub for WSGI middleware that will be used to handle openid auth protocol
+* RemoteAuth - WSGI middleware that can be used in services (like Swift, Nova, and Glance) when Auth middleware is running remotely
+
+
+ENVIRONMENT & DEPENDENCIES:
+---------------------------
+see pip-requires for dependency list
+Setup:
+Install http://pypi.python.org/pypi/setuptools
+ sudo easy_install pip
+ sudo pip install -r pip-requires
+
+
+RUNNING KEYSTONE:
+-----------------
+
+ $ cd bin
+ $ ./keystoned
+
+
+RUNNING TEST SERVICE:
+---------------------
+
+ Standalone stack (with Auth_Token)
+ $ cd echo/bin
+ $ ./echod
+
+ Distributed stack (with RemoteAuth local and Auth_Token remote)
+ $ cd echo/bon
+ $ ./echod --remote
+
+ in separate session
+ $ cd keystone/auth_protocols
+ $ python auth_token.py --remote
+
+
+DEMO CLIENT:
+---------------------
+ $ cd echo/echo
+ $ python echo_client.py
+ Note: this requires tests data. See section TESTING for initializing data
+
+
+TESTING
+-------
+
+After starting keystone a keystone.db sqlite database should be created in the keystone folder.
+
+Add test data to the database:
+
+ $ sqlite3 keystone/keystone.db < test/test_setup.sql
+
+To clean the test database
+
+ $ sqlite3 keystone/keystone.db < test/kill.sql
+
+To run unit tests:
+
+ $ python test/unit/test_identity.py
+
+To run client demo (with all auth middleware running locally on sample service):
+
+ $ ./echo/bin/echod
+ $ python echo/echo/echo_client.py
+
+
+To perform contract validation and load testing, use SoapUI (for now).
+
+Using SOAPUI:
+
+Download [SOAPUI](http://sourceforge.net/projects/soapui/files/):
+
+To Test Keystone Service:
+
+* File->Import Project
+* Select tests/IdentitySOAPUI.xml
+* Double click on "Keystone Tests" and press the green play (>) button
+
+
+Unit Test on Identity Services
+------------------------------
+In order to run the unit test on identity services:
+* start the keystone server
+* cat test_setup.sql |sqlite ../../keystone/keystone.db
+* go to unit test/unit directory
+* python test_identity.py
+
+For more on unit testing please refer
+
+ python test_identity --help
+
+
diff --git a/echo/bin/echod b/echo/bin/echod
new file mode 100755
index 00000000..d20a1b5e
--- /dev/null
+++ b/echo/bin/echod
@@ -0,0 +1,26 @@
+#!/bin/sh
+# Copyright (C) 2011 OpenStack LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# If ../echo/__init__.py exists, add ../ to the Python search path so
+# that it will override whatever may be installed in the default Python
+# search path.
+script_dir=`dirname $0`
+if [ -f "$script_dir/../echo/__init__.py" ]
+then
+ PYTHONPATH="$script_dir/..:$PYTHONPATH"
+ export PYTHONPATH
+fi
+
+/usr/bin/env python -m echo.server $*
diff --git a/echo/echo/__init__.py b/echo/echo/__init__.py
index 52bcde04..3c39fdbc 100644
--- a/echo/echo/__init__.py
+++ b/echo/echo/__init__.py
@@ -1 +1,17 @@
-from echo import app_factory
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+# Copyright (c) 2010-2011 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from server import app_factory
diff --git a/echo/echo/echo.py b/echo/echo/server.py
index e7fa38d3..8c24aa8f 100644
--- a/echo/echo/echo.py
+++ b/echo/echo/server.py
@@ -159,5 +159,6 @@ if __name__ == "__main__":
app = loadapp("config:" + \
os.path.join(os.path.abspath(os.path.dirname(__file__)),
ini), global_conf={"log_name": "echo.log"})
-
- wsgi.server(eventlet.listen(('', port)), app)
+ listener = eventlet.listen(('', port))
+ pool = eventlet.GreenPool(1000)
+ wsgi.server(listener, app, custom_pool=pool)
diff --git a/keystone/db/sqlalchemy/api.py b/keystone/db/sqlalchemy/api.py
index f1817a24..9fd9a734 100644
--- a/keystone/db/sqlalchemy/api.py
+++ b/keystone/db/sqlalchemy/api.py
@@ -471,6 +471,13 @@ def token_for_user(user_id, session=None):
return result
+def token_for_user_tenant(user_id, tenant_id, session=None):
+ if not session:
+ session = get_session()
+ result = session.query(models.Token).filter_by(
+ user_id=user_id, tenant_id=tenant_id).order_by("expires desc").first()
+ return result
+
def user_tenant_create(values):
user_tenant_ref = models.UserTenantAssociation()
user_tenant_ref.update(values)
diff --git a/keystone/logic/service.py b/keystone/logic/service.py
index a51e3959..efe2f930 100644
--- a/keystone/logic/service.py
+++ b/keystone/logic/service.py
@@ -51,17 +51,26 @@ class IDMService(object):
# Look for an existing token, or create one,
# TODO: Handle tenant/token search
#
- dtoken = db_api.token_for_user(duser.id)
+ if not credentials.tenant_id:
+ dtoken = db_api.token_for_user(duser.id)
+ else:
+ dtoken = db_api.token_for_user_tenant(duser.id, credentials.tenant_id)
if not dtoken or dtoken.expires < datetime.now():
dtoken = db_models.Token()
dtoken.token_id = str(uuid.uuid4())
dtoken.user_id = duser.id
+
if not duser.tenants:
raise fault.IDMFault("Strange: user %s is not associated "
"with a tenant!" % duser.id)
- dtoken.tenant_id = duser.tenants[0].tenant_id
+ if not credentials.tenant_id and db_api.user_get_by_tenant(duser.id, credentials.tenant_id):
+ raise fault.IDMFault("Error: user %s is not associated "
+ "with a tenant! %s" % (duser.id,
+ credentials.tenant_id))
+ dtoken.tenant_id = credentials.tenant_id
+ else:
+ dtoken.tenant_id = duser.tenants[0].tenant_id
dtoken.expires = datetime.now() + timedelta(days=1)
-
db_api.token_create(dtoken)
return self.__get_auth_data(dtoken, duser)
@@ -81,6 +90,7 @@ class IDMService(object):
% user.id)
return self.__get_auth_data(token, user)
+
def revoke_token(self, admin_token, token_id):
self.__validate_token(admin_token)
@@ -116,6 +126,7 @@ class IDMService(object):
return tenant
+
##
## GET Tenants with Pagination
##
@@ -136,8 +147,6 @@ class IDMService(object):
if next:
links.append(atom.Link('next', "%s?'marker=%s&limit=%s'" \
% (url, next, limit)))
-
-
return tenants.Tenants(ts, links)
def get_tenant(self, admin_token, tenant_id):
@@ -146,7 +155,6 @@ class IDMService(object):
dtenant = db_api.tenant_get(tenant_id)
if not dtenant:
raise fault.ItemNotFoundFault("The tenant could not be found")
-
return tenants.Tenant(dtenant.id, dtenant.desc, dtenant.enabled)
def update_tenant(self, admin_token, tenant_id, tenant):
@@ -159,11 +167,8 @@ class IDMService(object):
dtenant = db_api.tenant_get(tenant_id)
if dtenant == None:
raise fault.ItemNotFoundFault("The tenant cloud not be found")
-
values = {'desc': tenant.description, 'enabled': tenant.enabled}
-
db_api.tenant_update(tenant_id, values)
-
return tenants.Tenant(dtenant.id, tenant.description, tenant.enabled)
def delete_tenant(self, admin_token, tenant_id):
@@ -209,9 +214,7 @@ class IDMService(object):
dtenant.id = group.group_id
dtenant.desc = group.description
dtenant.tenant_id = tenant
-
db_api.tenant_group_create(dtenant)
-
return tenants.Group(dtenant.id, dtenant.desc, dtenant.tenant_id)
def get_tenant_groups(self, admin_token, tenantId, marker, limit, url):
@@ -849,7 +852,13 @@ class IDMService(object):
if len(duser.tenants) == 0:
raise fault.IDMFault("Strange: user %s is not associated "
"with a tenant!" % duser.id)
- user = auth.User(duser.id, duser.tenants[0].tenant_id, groups)
+ if not dtoken.tenant_id and \
+ db_api.user_get_by_tenant(duser.id, dtoken.tenant_id):
+ raise fault.IDMFault("Error: user %s is not associated "
+ "with a tenant! %s" % (duser.id,
+ dtoken.tenant_id))
+
+ user = auth.User(duser.id, dtoken.tenant_id, groups)
return auth.AuthData(token, user)
def __validate_token(self, token_id, admin=True):
diff --git a/setup.py b/setup.py
index f5ab870a..257f5f65 100644
--- a/setup.py
+++ b/setup.py
@@ -16,7 +16,7 @@
from setuptools import setup, find_packages
-version = '1.0'
+version = '1.0'
setup(
name='keystone',
diff --git a/test/unit/test_authentication.py b/test/unit/test_authentication.py
index 43838143..78bf0486 100644
--- a/test/unit/test_authentication.py
+++ b/test/unit/test_authentication.py
@@ -8,41 +8,41 @@ from webtest import TestApp
import httplib2
import json
from lxml import etree
-import unittest
-from webtest import TestApp
-from test_common import *
+
+import test_common as utils
class authentication_test(unittest.TestCase):
def setUp(self):
- self.token = get_token('joeuser', 'secrete', 'token')
- self.tenant = get_tenant()
- self.user = get_user()
- self.userdisabled = get_userdisabled()
- self.auth_token = get_auth_token()
- self.exp_auth_token = get_exp_auth_token()
- self.disabled_token = get_disabled_token()
+ self.tenant = utils.get_tenant()
+ self.token = utils.get_token('joeuser', 'secrete', 'token')
+ self.user = utils.get_user()
+ self.userdisabled = utils.get_userdisabled()
+ self.auth_token = utils.get_auth_token()
+ self.exp_auth_token = utils.get_exp_auth_token()
+ self.disabled_token = utils.get_disabled_token()
def tearDown(self):
- delete_token(self.token, self.auth_token)
+ utils.delete_token(self.token, self.auth_token)
def test_a_authorize(self):
- resp, content = get_token('joeuser', 'secrete')
+ resp, content = utils.get_token('joeuser', 'secrete', '')
self.assertEqual(200, int(resp['status']))
- self.assertEqual('application/json', content_type(resp))
+ self.assertEqual('application/json', utils.content_type(resp))
def test_a_authorize_xml(self):
- resp, content = get_token_xml('joeuser', 'secrete')
+ resp, content = utils.get_token_xml('joeuser', 'secrete', '',
+ self.tenant)
self.assertEqual(200, int(resp['status']))
- self.assertEqual('application/xml', content_type(resp))
+ self.assertEqual('application/xml', utils.content_type(resp))
def test_a_authorize_user_disabled(self):
- h = httplib2.Http(".cache")
- url = '%stoken' % URL
+ header = httplib2.Http(".cache")
+ url = '%stoken' % utils.URL
body = {"passwordCredentials": {"username": "disabled",
"password": "secrete"}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
+ resp, content = header.request(url, "POST", body=json.dumps(body),
headers={"Content-Type": "application/json"})
content = json.loads(content)
if int(resp['status']) == 500:
@@ -50,17 +50,17 @@ class authentication_test(unittest.TestCase):
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp['status']))
- self.assertEqual('application/json', content_type(resp))
+ self.assertEqual('application/json', utils.content_type(resp))
def test_a_authorize_user_disabled_xml(self):
- h = httplib2.Http(".cache")
- url = '%stoken' % URL
+ header = httplib2.Http(".cache")
+ url = '%stoken' % utils.URL
body = '<?xml version="1.0" encoding="UTF-8"?> \
<passwordCredentials \
xmlns="http://docs.openstack.org/idm/api/v1.0" \
password="secrete" username="disabled" \
/>'
- resp, content = h.request(url, "POST", body=body,
+ resp, content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/xml",
"ACCEPT": "application/xml"})
content = etree.fromstring(content)
@@ -69,14 +69,14 @@ class authentication_test(unittest.TestCase):
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp['status']))
- self.assertEqual('application/xml', content_type(resp))
+ self.assertEqual('application/xml', utils.content_type(resp))
def test_a_authorize_user_wrong(self):
- h = httplib2.Http(".cache")
- url = '%stoken' % URL
+ header = httplib2.Http(".cache")
+ url = '%stoken' % utils.URL
body = {"passwordCredentials": {"username-w": "disabled",
"password": "secrete"}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
+ resp, content = header.request(url, "POST", body=json.dumps(body),
headers={"Content-Type": "application/json"})
content = json.loads(content)
if int(resp['status']) == 500:
@@ -84,17 +84,17 @@ class authentication_test(unittest.TestCase):
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(400, int(resp['status']))
- self.assertEqual('application/json', content_type(resp))
+ self.assertEqual('application/json', utils.content_type(resp))
def test_a_authorize_user_wrong_xml(self):
- h = httplib2.Http(".cache")
- url = '%stoken' % URL
+ header = httplib2.Http(".cache")
+ url = '%stoken' % utils.URL
body = '<?xml version="1.0" encoding="UTF-8"?> \
<passwordCredentials \
xmlns="http://docs.openstack.org/idm/api/v1.0" \
password="secrete" username-w="disabled" \
/>'
- resp, content = h.request(url, "POST", body=body,
+ resp, content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/xml",
"ACCEPT": "application/xml"})
content = etree.fromstring(content)
@@ -103,7 +103,7 @@ class authentication_test(unittest.TestCase):
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(400, int(resp['status']))
- self.assertEqual('application/xml', content_type(resp))
+ self.assertEqual('application/xml', utils.content_type(resp))
if __name__ == '__main__':
- unittest.main() \ No newline at end of file
+ unittest.main()
diff --git a/test/unit/test_common.py b/test/unit/test_common.py
index 3b9197a8..79222674 100644
--- a/test/unit/test_common.py
+++ b/test/unit/test_common.py
@@ -8,100 +8,105 @@ from webtest import TestApp
import httplib2
import json
from lxml import etree
-import unittest
-from webtest import TestApp
+
URL = 'http://localhost:8080/v1.0/'
-def get_token(user, pswd, kind=''):
- h = httplib2.Http(".cache")
- url = '%stoken' % URL
+def get_token(user, pswd, kind='', tenant_id=None):
+ header = httplib2.Http(".cache")
+ url = '%stoken' % URL
+ if not tenant_id:
body = {"passwordCredentials": {"username": user,
"password": pswd}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json"})
- content = json.loads(content)
-
- token = str(content['auth']['token']['id'])
- if kind == 'token':
- return token
- else:
- return (resp, content)
+ else:
+ body = {"passwordCredentials": {"username": user,
+ "password": pswd,
+ "tenantId": tenant_id}}
+
+ resp, content = header.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json"})
+ content = json.loads(content)
+ token = str(content['auth']['token']['id'])
+ if kind == 'token':
+ return token
+ else:
+ return (resp, content)
def delete_token(token, auth_token):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
url = '%stoken/%s' % (URL, token)
- resp, content = h.request(url, "DELETE", body='',
+ resp, content = header.request(url, "DELETE", body='',
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
return (resp, content)
def create_tenant(tenantid, auth_token):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
url = '%stenants' % (URL)
body = {"tenant": {"id": tenantid,
"description": "A description ...",
"enabled": True}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
+ resp, content = header.request(url, "POST", body=json.dumps(body),
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
return (resp, content)
def create_tenant_group(groupid, tenantid, auth_token):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
url = '%stenant/%s/groups' % (URL, tenantid)
body = {"group": {"id": groupid,
"description": "A description ..."}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
+ resp, content = header.request(url, "POST", body=json.dumps(body),
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
return (resp, content)
def delete_tenant(tenantid, auth_token):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
url = '%stenants/%s' % (URL, tenantid)
- resp, content = h.request(url, "DELETE", body='{}',
+ resp, content = header.request(url, "DELETE", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
return (resp, content)
def delete_tenant_group(groupid, tenantid, auth_token):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
url = '%stenant/%s/groups/%s' % (URL, tenantid, groupid)
- resp, content = h.request(url, "DELETE", body='{}',
+ resp, content = header.request(url, "DELETE", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
return (resp, content)
def create_global_group(groupid, auth_token):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
url = '%sgroups' % (URL)
body = {"group": {"id": groupid,
- "description": "A description ..."}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
+ "description": "A description ..."}}
+ resp, content = header.request(url, "POST", body=json.dumps(body),
+
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
return (resp, content)
def create_global_group_xml(groupid, auth_token):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
url = '%sgroups' % (URL)
body = '<?xml version="1.0" encoding="UTF-8"?>\
<group xmlns="http://docs.openstack.org/idm/api/v1.0" \
id="%s"><description>A Description of the group</description>\
</group>' % groupid
- resp, content = h.request(url, "POST", body=body,
+ resp, content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token,
"ACCEPT": "application/xml"})
@@ -109,33 +114,39 @@ def create_global_group_xml(groupid, auth_token):
def delete_global_group(groupid, auth_token):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
url = '%sgroups/%s' % (URL, groupid)
- resp, content = h.request(url, "DELETE", body='{}',
+ resp, content = header.request(url, "DELETE", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
return (resp, content)
def delete_global_group_xml(groupid, auth_token):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
url = '%sgroups/%s' % (URL, groupid)
- resp, content = h.request(url, "DELETE", body='',
+ resp, content = header.request(url, "DELETE", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token,
"ACCEPT": "application/xml"})
return (resp, content)
-def get_token_xml(user, pswd, type=''):
- h = httplib2.Http(".cache")
+def get_token_xml(user, pswd, type='', tenant_id=None):
+ header = httplib2.Http(".cache")
url = '%stoken' % URL
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <passwordCredentials \
- xmlns="http://docs.openstack.org/idm/api/v1.0" \
- password="%s" username="%s" \
- tenantId="77654"/> ' % (pswd, user)
- resp, content = h.request(url, "POST", body=body,
+ if tenant_id:
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <passwordCredentials \
+ xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ password="%s" username="%s" \
+ tenantId="%s"/> ' % (pswd, user, tenant_id)
+ else:
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <passwordCredentials \
+ xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ password="%s" username="%s" /> ' % (pswd, user)
+ resp, content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/xml",
"ACCEPT": "application/xml"})
dom = etree.fromstring(content)
@@ -149,9 +160,9 @@ def get_token_xml(user, pswd, type=''):
def delete_token_xml(token, auth_token):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
url = '%stoken/%s' % (URL, token)
- resp, content = h.request(url, "DELETE", body='',
+ resp, content = header.request(url, "DELETE", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token,
"ACCEPT": "application/xml"})
@@ -159,14 +170,14 @@ def delete_token_xml(token, auth_token):
def create_tenant_xml(tenantid, auth_token):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
url = '%stenants' % (URL)
body = '<?xml version="1.0" encoding="UTF-8"?> \
<tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
enabled="true" id="%s"> \
<description>A description...</description> \
</tenant>' % tenantid
- resp, content = h.request(url, "POST", body=body,
+ resp, content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token,
"ACCEPT": "application/xml"})
@@ -174,14 +185,14 @@ def create_tenant_xml(tenantid, auth_token):
def create_tenant_group_xml(groupid, tenantid, auth_token):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
url = '%stenant/%s/groups' % (URL, tenantid)
body = '<?xml version="1.0" encoding="UTF-8"?> \
<group xmlns="http://docs.openstack.org/idm/api/v1.0" \
id="%s"> \
<description>A description...</description> \
</group>' % groupid
- resp, content = h.request(url, "POST", body=body,
+ resp, content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token,
"ACCEPT": "application/xml"})
@@ -189,9 +200,9 @@ def create_tenant_group_xml(groupid, tenantid, auth_token):
def delete_tenant_xml(tenantid, auth_token):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
url = '%stenants/%s' % (URL, tenantid)
- resp, content = h.request(url, "DELETE", body='',
+ resp, content = header.request(url, "DELETE", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token,
"ACCEPT": "application/xml"})
@@ -199,9 +210,9 @@ def delete_tenant_xml(tenantid, auth_token):
def delete_tenant_group_xml(groupid, tenantid, auth_token):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
url = '%stenant/%s/groups/%s' % (URL, tenantid, groupid)
- resp, content = h.request(url, "DELETE", body='',
+ resp, content = header.request(url, "DELETE", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token,
"ACCEPT": "application/xml"})
@@ -209,28 +220,39 @@ def delete_tenant_group_xml(groupid, tenantid, auth_token):
def create_user(tenantid, userid, auth_token):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
url = '%stenants/%s/users' % (URL, tenantid)
body = {"user": {"password": "secrete",
"id": userid,
"tenantId": tenantid,
"email": "%s@rackspace.com" % userid,
"enabled": True}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
+ resp, content = header.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": auth_token})
+ return (resp, content)
+
+
+def delete_user(tenant, userid, auth_token):
+ header = httplib2.Http(".cache")
+ url = '%stenants/%s/users/%s' % (URL, tenant, userid)
+
+ resp, content = header.request(url, "DELETE", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
+
return (resp, content)
def create_user_xml(tenantid, userid, auth_token):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
url = '%stenants/%s/users' % (URL, tenantid)
body = '<?xml version="1.0" encoding="UTF-8"?> \
<user xmlns="http://docs.openstack.org/idm/api/v1.0" \
email="joetest@rackspace.com" \
tenantId="%s" id="%s" \
enabled="true" password="secrete"/>' % (tenantid, userid)
- resp, content = h.request(url, "POST", body=body,
+ resp, content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token,
"ACCEPT": "application/xml"})
@@ -248,9 +270,9 @@ def delete_user(tenant, userid, auth_token):
def delete_user_xml(tenantid, userid, auth_token):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
url = '%stenants/%s/users/%s' % (URL, tenantid, userid)
- resp, content = h.request(url, "DELETE", body='',
+ resp, content = header.request(url, "DELETE", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token,
"ACCEPT": "application/xml"})
@@ -393,20 +415,20 @@ def users_group_get_xml(tenant_id, user_id, auth_token):
def add_user_tenant_group(tenantid, groupid, userid, auth_token):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
url = '%stenants/%s/groups/%s/users/%s' % (URL, tenantid, groupid, userid)
- resp, content = h.request(url, "PUT", body='',
+ resp, content = header.request(url, "PUT", body='',
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
return (resp, content)
def add_user_tenant_group_xml(tenantid, groupid, userid, auth_token):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
url = '%stenants/%s/groups/%s/users/%s' % (URL, tenantid, groupid, userid)
- resp, content = h.request(url, "PUT", body='',
+ resp, content = header.request(url, "PUT", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token,
"ACCEPT": "application/xml"})
@@ -414,20 +436,20 @@ def add_user_tenant_group_xml(tenantid, groupid, userid, auth_token):
def delete_user_tenant_group(tenantid, groupid, userid, auth_token):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
url = '%stenants/%s/groups/%s/users/%s' % (URL, tenantid, groupid, userid)
- resp, content = h.request(url, "DELETE", body='',
+ resp, content = header.request(url, "DELETE", body='',
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
return (resp, content)
def delete_user_tenant_group_xml(tenantid, groupid, userid, auth_token):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
url = '%stenants/%s/groups/%s/users/%s' % (URL, tenantid, groupid, userid)
- resp, content = h.request(url, "DELETE", body='',
+ resp, content = header.request(url, "DELETE", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token,
"ACCEPT": "application/xml"})
@@ -435,20 +457,20 @@ def delete_user_tenant_group_xml(tenantid, groupid, userid, auth_token):
def get_user_tenant_group(tenantid, groupid, auth_token):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
url = '%stenants/%s/groups/%s/users' % (URL, tenantid, groupid)
- resp, content = h.request(url, "GET", body='',
+ resp, content = header.request(url, "GET", body='',
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
return (resp, content)
def get_user_tenant_group_xml(tenantid, groupid, auth_token):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
url = '%stenants/%s/groups/%s/users' % (URL, tenantid, groupid)
- resp, content = h.request(url, "GET", body='',
+ resp, content = header.request(url, "GET", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token,
"ACCEPT": "application/xml"})
@@ -456,20 +478,20 @@ def get_user_tenant_group_xml(tenantid, groupid, auth_token):
def add_user_global_group(groupid, userid, auth_token):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
url = '%sgroups/%s/users/%s' % (URL, groupid, userid)
- resp, content = h.request(url, "PUT", body='',
+ resp, content = header.request(url, "PUT", body='',
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
return (resp, content)
def add_user_global_group_xml(groupid, userid, auth_token):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
url = '%sgroups/%s/users/%s' % (URL, groupid, userid)
- resp, content = h.request(url, "PUT", body='',
+ resp, content = header.request(url, "PUT", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token,
"ACCEPT": "application/xml"})
@@ -477,20 +499,20 @@ def add_user_global_group_xml(groupid, userid, auth_token):
def delete_user_global_group(groupid, userid, auth_token):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
url = '%sgroups/%s/users/%s' % (URL, groupid, userid)
- resp, content = h.request(url, "DELETE", body='',
+ resp, content = header.request(url, "DELETE", body='',
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
return (resp, content)
def delete_user_global_group_xml(groupid, userid, auth_token):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
url = '%sgroups/%s/users/%s' % (URL, groupid, userid)
- resp, content = h.request(url, "DELETE", body='',
+ resp, content = header.request(url, "DELETE", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token,
"ACCEPT": "application/xml"})
@@ -498,10 +520,10 @@ def delete_user_global_group_xml(groupid, userid, auth_token):
def get_user_global_group(groupid, auth_token):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
url = '%sgroups/%s/users' % (URL, groupid)
- resp, content = h.request(url, "GET", body='',
+ resp, content = header.request(url, "GET", body='',
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
@@ -521,10 +543,10 @@ def get_email():
def get_user_global_group_xml(groupid, auth_token):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
url = '%sgroups/%s/users' % (URL, groupid)
- resp, content = h.request(url, "GET", body='',
+ resp, content = header.request(url, "GET", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": auth_token,
"ACCEPT": "application/xml"})
diff --git a/test/unit/test_exthandler.py b/test/unit/test_exthandler.py
new file mode 100644
index 00000000..4d09c148
--- /dev/null
+++ b/test/unit/test_exthandler.py
@@ -0,0 +1,49 @@
+import os
+import sys
+# Need to access identity module
+sys.path.append(os.path.abspath(os.path.join(
+ os.getcwd(), '..', '..', 'keystone')))
+from queryext.exthandler import UrlExtensionFilter
+import unittest
+
+
+class MockWsgiApp(object):
+
+ def __init__(self):
+ pass
+
+ def __call__(self, env, start_response):
+ pass
+
+
+def _start_response():
+ pass
+
+
+class UrlExtensionFilterTest(unittest.TestCase):
+
+ def setUp(self):
+ self.filter = UrlExtensionFilter(MockWsgiApp(), {})
+
+ def test_xml_extension(self):
+ env = {'PATH_INFO': '/v1.0/someresource.xml'}
+ self.filter(env, _start_response)
+ self.assertEqual('/v1.0/someresource', env['PATH_INFO'])
+ self.assertEqual('application/xml', env['HTTP_ACCEPT'])
+
+ def test_json_extension(self):
+ env = {'PATH_INFO': '/v1.0/someresource.json'}
+ self.filter(env, _start_response)
+ self.assertEqual('/v1.0/someresource', env['PATH_INFO'])
+ self.assertEqual('application/json', env['HTTP_ACCEPT'])
+
+ def test_extension_overrides_header(self):
+ env = {'PATH_INFO': '/v1.0/someresource.json',
+ 'HTTP_ACCEPT': 'application/xml'}
+ self.filter(env, _start_response)
+ self.assertEqual('/v1.0/someresource', env['PATH_INFO'])
+ self.assertEqual('application/json', env['HTTP_ACCEPT'])
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/test/unit/test_groups.py b/test/unit/test_groups.py
index 1e6e4be1..1ca4dff3 100644
--- a/test/unit/test_groups.py
+++ b/test/unit/test_groups.py
@@ -8,39 +8,40 @@ from webtest import TestApp
import httplib2
import json
from lxml import etree
-import unittest
-from webtest import TestApp
-from test_common import *
+
+import test_common as utils
##
## Global Group Tests
##
+
+
class global_group_test(unittest.TestCase):
def setUp(self):
- self.token = get_token('joeuser', 'secrete', 'token')
- self.globaltenant = get_global_tenant()
- self.user = get_user()
- self.userdisabled = get_userdisabled()
- self.auth_token = get_auth_token()
- self.exp_auth_token = get_exp_auth_token()
- self.disabled_token = get_disabled_token()
+ self.token = utils.get_token('joeuser', 'secrete', 'token')
+ self.globaltenant = utils.get_global_tenant()
+ self.user = utils.get_user()
+ self.userdisabled = utils.get_userdisabled()
+ self.auth_token = utils.get_auth_token()
+ self.exp_auth_token = utils.get_exp_auth_token()
+ self.disabled_token = utils.get_disabled_token()
self.global_group = 'test_global_group_add'
def tearDown(self):
- resp, content = delete_global_group(self.global_group,
+ resp, content = utils.delete_global_group(self.global_group,
self.auth_token)
- resp, content = delete_tenant(self.globaltenant, self.auth_token)
+ resp, content = utils.delete_tenant(self.globaltenant, self.auth_token)
class create_global_group_test(global_group_test):
def test_global_group_create(self):
- respG, contentG = delete_global_group(self.global_group,
+ respG, contentG = utils.delete_global_group(self.global_group,
str(self.auth_token))
- respG, contentG = create_global_group(self.global_group,
+ respG, contentG = utils.create_global_group(self.global_group,
str(self.auth_token))
-
+
if int(respG['status']) == 500:
self.fail('IDM fault')
elif int(respG['status']) == 503:
@@ -49,36 +50,34 @@ class create_global_group_test(global_group_test):
self.fail('Failed due to %d' % int(respG['status']))
def test_global_group_create_xml(self):
- respG, contentG = delete_global_group_xml(self.global_group,
+ respG, contentG = utils.delete_global_group_xml(self.global_group,
str(self.auth_token))
-
- respG, contentG = create_global_group_xml(self.global_group,
+ respG, contentG = utils.create_global_group_xml(self.global_group,
str(self.auth_token))
-
+
if int(respG['status']) == 500:
self.fail('IDM fault')
elif int(respG['status']) == 503:
self.fail('Service Not Available')
-
+
if int(respG['status']) not in (200, 201):
self.fail('Failed due to %d' % int(respG['status']))
def test_global_group_create_again(self):
- respG, contentG = create_global_group(self.global_group,
+ respG, contentG = utils.create_global_group(self.global_group,
str(self.auth_token))
- respG, contentG = create_global_group(self.global_group,
+ respG, contentG = utils.create_global_group(self.global_group,
str(self.auth_token))
if int(respG['status']) == 500:
self.fail('IDM fault')
elif int(respG['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(409, int(respG['status']))
-
def test_global_group_create_again_xml(self):
- respG, contentG = create_global_group_xml(self.global_group,
+ respG, contentG = utils.create_global_group_xml(self.global_group,
str(self.auth_token))
- respG, contentG = create_global_group_xml(self.global_group,
+ respG, contentG = utils.create_global_group_xml(self.global_group,
str(self.auth_token))
contentG = etree.fromstring(contentG)
if int(respG['status']) == 500:
@@ -86,13 +85,12 @@ class create_global_group_test(global_group_test):
elif int(respG['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(409, int(respG['status']))
-
def test_global_group_create_unauthorized_token(self):
h = httplib2.Http(".cache")
- respG, contentG = create_global_group_xml(self.global_group,
+ respG, contentG = utils.create_global_group_xml(self.global_group,
str(self.auth_token))
- url = '%sgroups' % (URL)
+ url = '%sgroups' % (utils.URL)
body = {"group": {"id": self.global_group,
"description": "A description ..."}}
resp, content = h.request(url, "POST", body=json.dumps(body),
@@ -106,7 +104,7 @@ class create_global_group_test(global_group_test):
def test_global_group_create_unauthorized_token_xml(self):
h = httplib2.Http(".cache")
- url = '%sgroups' % (URL)
+ url = '%sgroups' % (utils.URL)
body = '<?xml version="1.0" encoding="UTF-8"?> \
<group xmlns="http://docs.openstack.org/idm/api/v1.0" \
id="%s"> \
@@ -124,13 +122,13 @@ class create_global_group_test(global_group_test):
def test_global_group_create_expired_token(self):
h = httplib2.Http(".cache")
- url = '%sgroups' % (URL)
+ url = '%sgroups' % (utils.URL)
body = {"group": {"id": self.global_group,
"description": "A description ..."}}
resp, content = h.request(url, "POST", body=json.dumps(body),
headers={"Content-Type": "application/json",
- "X-Auth-Token": self.exp_auth_token
- })
+ "X-Auth-Token": \
+ self.exp_auth_token})
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
@@ -139,7 +137,7 @@ class create_global_group_test(global_group_test):
def test_global_group_create_expired_token_xml(self):
h = httplib2.Http(".cache")
- url = '%sgroups' % (URL)
+ url = '%sgroups' % (utils.URL)
body = '<?xml version="1.0" encoding="UTF-8"?> \
<group xmlns="http://docs.openstack.org/idm/api/v1.0" \
id="%s"><description>A description...</description> \
@@ -156,7 +154,7 @@ class create_global_group_test(global_group_test):
def test_global_group_create_missing_token(self):
h = httplib2.Http(".cache")
- url = '%sgroups' % (URL)
+ url = '%sgroups' % (utils.URL)
body = {"group": {"id": self.global_group,
"description": "A description ..."}}
resp, content = h.request(url, "POST", body=json.dumps(body),
@@ -169,7 +167,7 @@ class create_global_group_test(global_group_test):
def test_global_group_create_missing_token_xml(self):
h = httplib2.Http(".cache")
- url = '%sgroups' % (URL)
+ url = '%sgroups' % (utils.URL)
body = '<?xml version="1.0" encoding="UTF-8"?> \
<group xmlns="http://docs.openstack.org/idm/api/v1.0" \
id="%s"><description>A description...</description> \
@@ -185,13 +183,13 @@ class create_global_group_test(global_group_test):
def test_global_group_create_disabled_token(self):
h = httplib2.Http(".cache")
- url = '%sgroups' % (URL)
+ url = '%sgroups' % (utils.URL)
body = '{"group": { "id": "%s", \
"description": "A description ..." } }' % self.global_group
resp, content = h.request(url, "POST", body=body,
headers={"Content-Type": "application/json",
- "X-Auth-Token": self.disabled_token
- })
+ "X-Auth-Token": \
+ self.disabled_token})
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
@@ -200,7 +198,7 @@ class create_global_group_test(global_group_test):
def test_global_group_create_disabled_token_xml(self):
h = httplib2.Http(".cache")
- url = '%sgroups' % (URL)
+ url = '%sgroups' % (utils.URL)
body = '<?xml version="1.0" encoding="UTF-8"?> \
<group xmlns="http://docs.openstack.org/idm/api/v1.0" \
id="%s"><description>A description...</description> \
@@ -217,7 +215,7 @@ class create_global_group_test(global_group_test):
def test_global_group_create_invalid_token(self):
h = httplib2.Http(".cache")
- url = '%sgroups' % (URL)
+ url = '%sgroups' % (utils.URL)
body = '{"group": { "id": "%s", \
"description": "A description ..." } }' % self.globaltenant
resp, content = h.request(url, "POST", body=body,
@@ -231,7 +229,7 @@ class create_global_group_test(global_group_test):
def test_global_group_create_invalid_token_xml(self):
h = httplib2.Http(".cache")
- url = '%sgroups' % (URL)
+ url = '%sgroups' % (utils.URL)
body = '<?xml version="1.0" encoding="UTF-8"?> \
<group xmlns="http://docs.openstack.org/idm/api/v1.0" \
id="%s"><description>A description...</description> \
@@ -251,12 +249,12 @@ class get_global_groups_test(global_group_test):
def test_get_global_groups(self):
h = httplib2.Http(".cache")
- respG, contentG = delete_global_group(self.global_group,
+ respG, contentG = utils.delete_global_group(self.global_group,
str(self.auth_token))
- respG, contentG = create_global_group(self.global_group,
+ respG, contentG = utils.create_global_group(self.global_group,
str(self.auth_token))
-
- url = '%sgroups' % (URL)
+
+ url = '%sgroups' % (utils.URL)
resp, content = h.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.auth_token})
@@ -268,9 +266,9 @@ class get_global_groups_test(global_group_test):
def test_get_global_groups_xml(self):
h = httplib2.Http(".cache")
- respG, contentG = create_global_group_xml(self.global_group,
+ respG, contentG = utils.create_global_group_xml(self.global_group,
str(self.auth_token))
- url = '%sgroups' % (URL)
+ url = '%sgroups' % (utils.URL)
resp, content = h.request(url, "GET", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.auth_token,
@@ -283,9 +281,9 @@ class get_global_groups_test(global_group_test):
def test_get_global_groups_unauthorized_token(self):
h = httplib2.Http(".cache")
- respG, contentG = create_global_group(self.global_group,
+ respG, contentG = utils.create_global_group(self.global_group,
str(self.auth_token))
- url = '%sgroups' % (URL)
+ url = '%sgroups' % (utils.URL)
#test for Content-Type = application/json
resp, content = h.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
@@ -298,9 +296,9 @@ class get_global_groups_test(global_group_test):
def test_get_global_groups_unauthorized_token_xml(self):
h = httplib2.Http(".cache")
- respG, contentG = create_global_group_xml(self.global_group,
+ respG, contentG = utils.create_global_group_xml(self.global_group,
str(self.auth_token))
- url = '%sgroups' % (URL)
+ url = '%sgroups' % (utils.URL)
#test for Content-Type = application/json
resp, content = h.request(url, "GET", body='',
headers={"Content-Type": "application/xml",
@@ -314,14 +312,14 @@ class get_global_groups_test(global_group_test):
def test_get_global_groups_exp_token(self):
h = httplib2.Http(".cache")
- respG, contentG = create_global_group(self.global_group,
+ respG, contentG = utils.create_global_group(self.global_group,
str(self.auth_token))
- url = '%sgroups' % (URL)
+ url = '%sgroups' % (utils.URL)
#test for Content-Type = application/json
resp, content = h.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
- "X-Auth-Token": self.exp_auth_token
- })
+ "X-Auth-Token": \
+ self.exp_auth_token})
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
@@ -330,9 +328,9 @@ class get_global_groups_test(global_group_test):
def test_get_global_groups_exp_token_xml(self):
h = httplib2.Http(".cache")
- respG, contentG = create_global_group_xml(self.global_group,
+ respG, contentG = utils.create_global_group_xml(self.global_group,
str(self.auth_token))
- url = '%sgroups' % (URL)
+ url = '%sgroups' % (utils.URL)
#test for Content-Type = application/json
resp, content = h.request(url, "GET", body='',
headers={"Content-Type": "application/xml",
@@ -349,9 +347,9 @@ class get_global_group_test(global_group_test):
def test_get_global_group(self):
h = httplib2.Http(".cache")
- respG, contentG = create_global_group(self.global_group,
+ respG, contentG = utils.create_global_group(self.global_group,
str(self.auth_token))
- url = '%sgroups/%s' % (URL, self.global_group)
+ url = '%sgroups/%s' % (utils.URL, self.global_group)
#test for Content-Type = application/json
resp, content = h.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
@@ -364,9 +362,9 @@ class get_global_group_test(global_group_test):
def test_get_global_group_xml(self):
h = httplib2.Http(".cache")
- respG, contentG = create_global_group_xml(self.global_group,
+ respG, contentG = utils.create_global_group_xml(self.global_group,
str(self.auth_token))
- url = '%sgroups/%s' % (URL, self.global_group)
+ url = '%sgroups/%s' % (utils.URL, self.global_group)
#test for Content-Type = application/json
resp, content = h.request(url, "GET", body='',
headers={"Content-Type": "application/xml",
@@ -380,9 +378,9 @@ class get_global_group_test(global_group_test):
def test_get_global_group_bad(self):
h = httplib2.Http(".cache")
- respG, contentG = create_global_group(self.global_group,
+ respG, contentG = utils.create_global_group(self.global_group,
str(self.auth_token))
- url = '%sgroups/%s' % (URL, 'global_group_bad')
+ url = '%sgroups/%s' % (utils.URL, 'global_group_bad')
#test for Content-Type = application/json
resp, content = h.request(url, "GET", body='',
headers={"Content-Type": "application/json",
@@ -395,9 +393,9 @@ class get_global_group_test(global_group_test):
def test_get_global_group_bad_xml(self):
h = httplib2.Http(".cache")
- respG, contentG = create_global_group_xml(self.global_group,
+ respG, contentG = utils.create_global_group_xml(self.global_group,
str(self.auth_token))
- url = '%sgroups/%s' % (URL , 'global_group_bad')
+ url = '%sgroups/%s' % (utils.URL, 'global_group_bad')
#test for Content-Type = application/json
resp, content = h.request(url, "GET", body='',
headers={"Content-Type": "application/xml",
@@ -409,15 +407,14 @@ class get_global_group_test(global_group_test):
self.fail('Service Not Available')
self.assertEqual(404, int(resp['status']))
-
class update_global_groups_test(global_group_test):
def test_update_global_group(self):
h = httplib2.Http(".cache")
- respG, contentG = create_global_group(self.global_group,
+ respG, contentG = utils.create_global_group(self.global_group,
str(self.auth_token))
- url = '%sgroups/%s' % (URL, self.global_group)
+ url = '%sgroups/%s' % (utils.URL, self.global_group)
resp, content = h.request(url, "PUT", body='{"group":{\
"id" : "%s","description" :\
"A New description of the group..."}}' % self.global_group,
@@ -435,10 +432,10 @@ class update_global_groups_test(global_group_test):
def test_update_global_group_xml(self):
h = httplib2.Http(".cache")
- respG, contentG = create_global_group(self.global_group,
+ respG, contentG = utils.create_global_group(self.global_group,
str(self.auth_token))
-
- url = '%sgroups/%s' % (URL, self.global_group)
+
+ url = '%sgroups/%s' % (utils.URL, self.global_group)
data = u'<?xml version="1.0" encoding="UTF-8"?> \
<group xmlns="http://docs.openstack.org/idm/api/v1.0" \
id="%s"><description>A NEW description...</description> \
@@ -448,7 +445,7 @@ class update_global_groups_test(global_group_test):
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.auth_token,
"ACCEPT": "application/xml"})
-
+
body = etree.fromstring(content)
desc = body.find("{http://docs.openstack.org/idm/api/v1.0}description")
if int(resp['status']) == 500:
@@ -461,9 +458,9 @@ class update_global_groups_test(global_group_test):
def test_update_global_group_bad(self):
h = httplib2.Http(".cache")
- respG, contentG = create_global_group(self.global_group,
+ respG, contentG = utils.create_global_group(self.global_group,
str(self.auth_token))
- url = '%sgroups/%s' % (URL, self.global_group)
+ url = '%sgroups/%s' % (utils.URL, self.global_group)
data = '{"group": { "description_bad": "A NEW description...", \
"id":"%s" }}'\
% (self.global_group)
@@ -479,9 +476,9 @@ class update_global_groups_test(global_group_test):
def test_update_global_group_bad_xml(self):
h = httplib2.Http(".cache")
- respG, contentG = create_global_group_xml(self.global_group,
+ respG, contentG = utils.create_global_group_xml(self.global_group,
str(self.auth_token))
- url = '%sgroups/%s' % (URL, self.global_group)
+ url = '%sgroups/%s' % (utils.URL, self.global_group)
data = '<?xml version="1.0" encoding="UTF-8"?> \
<group xmlns="http://docs.openstack.org/idm/api/v1.0" \
id="%s"><description_bad>A NEW description...</description> \
@@ -500,9 +497,9 @@ class update_global_groups_test(global_group_test):
def test_update_global_group_not_found(self):
h = httplib2.Http(".cache")
- respG, contentG = create_global_group(self.global_group,
+ respG, contentG = utils.create_global_group(self.global_group,
str(self.auth_token))
- url = '%sgroups/NonexistingID' % (URL)
+ url = '%sgroups/NonexistingID' % (utils.URL)
data = '{"group": { "description": "A NEW description...", \
"id":"NonexistingID"}}'
#test for Content-Type = application/json
@@ -513,9 +510,9 @@ class update_global_groups_test(global_group_test):
def test_update_global_group_not_found_xml(self):
h = httplib2.Http(".cache")
- resp, content = create_tenant_xml(self.globaltenant,
+ resp, content = utils.create_tenant_xml(self.globaltenant,
str(self.auth_token))
- url = '%sgroups/NonexistingID' % (URL)
+ url = '%sgroups/NonexistingID' % (utils.URL)
data = '<?xml version="1.0" encoding="UTF-8"?> \
<group xmlns="http://docs.openstack.org/idm/api/v1.0" \
id="NonexistingID"> \
@@ -536,506 +533,467 @@ class update_global_groups_test(global_group_test):
class delete_global_group_test(global_group_test):
def test_delete_global_group_not_found(self):
- resp, content = delete_global_group("test_global_group_1",
+ resp, content = utils.delete_global_group("test_global_group_1",
str(self.auth_token))
self.assertEqual(404, int(resp['status']))
def test_delete_global_group_not_found_xml(self):
- resp, content = delete_global_group_xml("test_global_group_1",
+ resp, content = utils.delete_global_group_xml("test_global_group_1",
str(self.auth_token))
self.assertEqual(404, int(resp['status']))
def test_delete_global_group(self):
- resp, content = create_tenant(self.globaltenant, str(self.auth_token))
- respG, contentG = create_tenant_group('test_global_group_delete',
+ resp, content = utils.create_tenant(self.globaltenant,
+ str(self.auth_token))
+ respG, contentG = utils.create_tenant_group('test_global_group_delete',
self.globaltenant,
str(self.auth_token))
- respG, contentG = delete_global_group('test_global_group_delete',
+ respG, contentG = utils.delete_global_group('test_global_group_delete',
str(self.auth_token))
- resp, content = delete_tenant(self.globaltenant,
+ resp, content = utils.delete_tenant(self.globaltenant,
str(self.auth_token))
self.assertEqual(204, int(respG['status']))
def test_delete_global_group_xml(self):
- resp, content = create_tenant_xml(self.globaltenant,
- str(self.auth_token))
- respG, contentG = create_tenant_group_xml('test_global_group_delete',
+ resp, content = utils.create_tenant_xml(self.globaltenant,
+ str(self.auth_token))
+ respG, contentG = utils.create_tenant_group_xml(\
+ 'test_global_group_delete',
self.globaltenant,
str(self.auth_token))
- respG, contentG = delete_global_group_xml('test_global_group_delete',
+ respG, contentG = utils.delete_global_group_xml(\
+ 'test_global_group_delete',
str(self.auth_token))
- resp, content = delete_tenant_xml(self.globaltenant,
+ resp, content = utils.delete_tenant_xml(self.globaltenant,
str(self.auth_token))
self.assertEqual(204, int(resp['status']))
-
-
class add_user_global_group_test(unittest.TestCase):
-
+
def setUp(self):
- self.token = get_token('joeuser', 'secrete', 'token')
- self.tenant = get_global_tenant()
- self.user = get_user()
- self.userdisabled = get_userdisabled()
- self.auth_token = get_auth_token()
- self.exp_auth_token = get_exp_auth_token()
- self.disabled_token = get_disabled_token()
+ self.token = utils.get_token('joeuser', 'secrete', 'token')
+ self.tenant = utils.get_global_tenant()
+ self.user = utils.get_user()
+ self.userdisabled = utils.get_userdisabled()
+ self.auth_token = utils.get_auth_token()
+ self.exp_auth_token = utils.get_exp_auth_token()
+ self.disabled_token = utils.get_disabled_token()
self.global_group = 'test_global_group'
-
-
def tearDown(self):
- respG, contentG = delete_user_global_group(self.global_group,
+ respG, contentG = utils.delete_user_global_group(self.global_group,
self.user,
str(self.auth_token))
-
- respG, contentG = delete_user(self.tenant, self.user,
+
+ respG, contentG = utils.delete_user(self.tenant, self.user,
str(self.auth_token))
- resp, content = delete_global_group(self.global_group,
+ resp, content = utils.delete_global_group(self.global_group,
self.auth_token)
-
-
-
+
def test_add_user_global_group(self):
h = httplib2.Http(".cache")
- resp, content = create_global_group(self.global_group,
+ resp, content = utils.create_global_group(self.global_group,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ respG, contentG = utils.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_global_group(self.global_group,
- self.user, str(self.auth_token)
- )
-
+ respG, contentG = utils.add_user_global_group(self.global_group,
+ self.user,
+ str(self.auth_token))
+
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
if int(respG['status']) not in (200, 201):
self.fail('Failed due to %d' % int(respG['status']))
-
-
+
def test_add_user_global_group_xml(self):
h = httplib2.Http(".cache")
- resp, content = create_global_group(self.global_group,
+ resp, content = utils.create_global_group(self.global_group,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ respG, contentG = utils.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_global_group_xml(self.global_group,
- self.user, str(self.auth_token)
- )
-
-
+ respG, contentG = utils.add_user_global_group_xml(self.global_group,
+ self.user,
+ str(self.auth_token))
+
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
if int(respG['status']) not in (200, 201):
self.fail('Failed due to %d' % int(respG['status']))
-
-
+
def test_add_user_global_group_conflict(self):
h = httplib2.Http(".cache")
- resp, content = create_global_group(self.global_group,
+ resp, content = utils.create_global_group(self.global_group,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ respG, contentG = utils.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_global_group(self.global_group,
- self.user, str(self.auth_token)
- )
- respG, contentG = add_user_global_group(self.global_group,
- self.user, str(self.auth_token)
- )
-
-
+ respG, contentG = utils.add_user_global_group(self.global_group,
+ self.user,
+ str(self.auth_token))
+ respG, contentG = utils.add_user_global_group(self.global_group,
+ self.user,
+ str(self.auth_token))
+
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(409, int(respG['status']))
-
+
def test_add_user_global_group_conflict_xml(self):
h = httplib2.Http(".cache")
- resp, content = create_global_group(self.global_group,
+ resp, content = utils.create_global_group(self.global_group,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ respG, contentG = utils.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_global_group_xml(self.global_group,
- self.user, str(self.auth_token)
- )
- respG, contentG = add_user_global_group_xml(self.global_group,
- self.user, str(self.auth_token)
- )
-
+ respG, contentG = utils.add_user_global_group_xml(self.global_group,
+ self.user,
+ str(self.auth_token))
+ respG, contentG = utils.add_user_global_group_xml(self.global_group,
+ self.user,
+ str(self.auth_token))
+
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(409, int(respG['status']))
-
+
def test_add_user_global_group_unauthorized(self):
h = httplib2.Http(".cache")
- resp, content = create_global_group(self.global_group,
+ resp, content = utils.create_global_group(self.global_group,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ respG, contentG = utils.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_global_group(self.global_group,
- self.user, str(self.token)
- )
-
-
+ respG, contentG = utils.add_user_global_group(self.global_group,
+ self.user,
+ str(self.token))
+
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(401, int(respG['status']))
-
+
def test_add_user_global_group_unauthorized_xml(self):
h = httplib2.Http(".cache")
- resp, content = create_global_group(self.global_group,
+ resp, content = utils.create_global_group(self.global_group,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ respG, contentG = utils.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_global_group_xml(self.global_group,
- self.user, str(self.token)
- )
-
+ respG, contentG = utils.add_user_global_group_xml(self.global_group,
+ self.user,
+ str(self.token))
+
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(401, int(respG['status']))
-
+
def test_add_user_global_group_forbidden(self):
h = httplib2.Http(".cache")
- resp, content = create_global_group(self.global_group,
+ resp, content = utils.create_global_group(self.global_group,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ respG, contentG = utils.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_global_group(self.global_group,
- self.user,
- str(self.disabled_token)
- )
-
+ respG, contentG = utils.add_user_global_group(self.global_group,
+ self.user,
+ str(self.disabled_token))
+
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(respG['status']))
-
+
def test_add_user_global_group_forbidden_xml(self):
h = httplib2.Http(".cache")
- resp, content = create_global_group(self.global_group,
+ resp, content = utils.create_global_group(self.global_group,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ respG, contentG = utils.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_global_group_xml(self.global_group,
- self.user,
- str(self.disabled_token)
- )
+ respG, contentG = utils.add_user_global_group_xml(self.global_group,
+ self.user,
+ str(self.disabled_token))
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(respG['status']))
-
+
class get_users_tenant_group_test(unittest.TestCase):
-
+
def setUp(self):
- self.token = get_token('joeuser', 'secrete', 'token')
- self.tenant = get_global_tenant()
- self.user = get_user()
- self.userdisabled = get_userdisabled()
- self.auth_token = get_auth_token()
- self.exp_auth_token = get_exp_auth_token()
- self.disabled_token = get_disabled_token()
+ self.token = utils.get_token('joeuser', 'secrete', 'token')
+ self.tenant = utils.get_global_tenant()
+ self.user = utils.get_user()
+ self.userdisabled = utils.get_userdisabled()
+ self.auth_token = utils.get_auth_token()
+ self.exp_auth_token = utils.get_exp_auth_token()
+ self.disabled_token = utils.get_disabled_token()
self.global_group = 'test_global_group'
-
-
def tearDown(self):
- respG, contentG = delete_user_global_group(self.global_group,
+ respG, contentG = utils.delete_user_global_group(self.global_group,
self.user,
str(self.auth_token))
-
- respG, contentG = delete_user(self.tenant, self.user,
+
+ respG, contentG = utils.delete_user(self.tenant, self.user,
str(self.auth_token))
- resp, content = delete_global_group(self.global_group,
- self.auth_token)
+ resp, content = utils.delete_global_group(self.global_group,
+ self.auth_token)
+
def test_get_users_global_group(self):
h = httplib2.Http(".cache")
- resp, content = create_global_group(self.global_group,
+ resp, content = utils.create_global_group(self.global_group,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ respG, contentG = utils.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_global_group(self.global_group,
- self.user,
- str(self.auth_token)
- )
- respG, contentG = get_user_global_group(self.global_group,
- str(self.auth_token)
- )
-
+ respG, contentG = utils.add_user_global_group(self.global_group,
+ self.user,
+ str(self.auth_token))
+ respG, contentG = utils.get_user_global_group(self.global_group,
+ str(self.auth_token))
+
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(respG['status']))
-
-
+
def test_get_users_global_group_xml(self):
h = httplib2.Http(".cache")
- resp, content = create_global_group(self.global_group,
+ resp, content = utils.create_global_group(self.global_group,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ respG, contentG = utils.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_global_group_xml(self.global_group,
- self.user,
- str(self.auth_token)
- )
- respG, contentG = get_user_global_group_xml(self.global_group,
- str(self.auth_token)
- )
+ respG, contentG = utils.add_user_global_group_xml(self.global_group,
+ self.user,
+ str(self.auth_token))
+ respG, contentG = utils.get_user_global_group_xml(self.global_group,
+ str(self.auth_token))
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(respG['status']))
-
-
+
def test_get_users_global_group_unauthorized(self):
h = httplib2.Http(".cache")
- resp, content = create_global_group(self.global_group,
+ resp, content = utils.create_global_group(self.global_group,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ respG, contentG = utils.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_global_group(self.global_group,
- self.user,
- str(self.auth_token)
- )
-
- respG, contentG = get_user_global_group(self.global_group,
- str(self.token)
- )
-
+ respG, contentG = utils.add_user_global_group(self.global_group,
+ self.user,
+ str(self.auth_token))
+
+ respG, contentG = utils.get_user_global_group(self.global_group,
+ str(self.token))
+
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(401, int(respG['status']))
-
+
def test_get_users_global_group_unauthorized_xml(self):
h = httplib2.Http(".cache")
- resp, content = create_global_group(self.global_group,
+ resp, content = utils.create_global_group(self.global_group,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ respG, contentG = utils.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_global_group(self.global_group,
- self.user,
- str(self.auth_token)
- )
- respG, contentG = get_user_global_group_xml(self.global_group,
- str(self.token)
- )
-
+ respG, contentG = utils.add_user_global_group(self.global_group,
+ self.user,
+ str(self.auth_token))
+ respG, contentG = utils.get_user_global_group_xml(self.global_group,
+ str(self.token))
+
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(401, int(respG['status']))
-
+
def test_get_users_global_group_forbidden(self):
h = httplib2.Http(".cache")
- resp, content = create_global_group(self.global_group,
+ resp, content = utils.create_global_group(self.global_group,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ respG, contentG = utils.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_global_group(self.global_group,
- self.user,
- str(self.auth_token)
- )
- respG, contentG = get_user_global_group(self.global_group,
- str(self.disabled_token)
- )
-
+ respG, contentG = utils.add_user_global_group(self.global_group,
+ self.user,
+ str(self.auth_token))
+ respG, contentG = utils.get_user_global_group(self.global_group,
+ str(self.disabled_token))
+
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(respG['status']))
-
+
def test_get_users_global_group_forbidden_xml(self):
h = httplib2.Http(".cache")
- resp, content = create_global_group(self.global_group,
+ resp, content = utils.create_global_group(self.global_group,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ respG, contentG = utils.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_global_group(self.global_group,
- self.user,
- str(self.auth_token)
- )
- respG, contentG = get_user_global_group_xml(self.global_group,
- str(self.disabled_token)
- )
+ respG, contentG = utils.add_user_global_group(self.global_group,
+ self.user,
+ str(self.auth_token))
+ respG, contentG = utils.get_user_global_group_xml(self.global_group,
+ str(self.disabled_token))
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(respG['status']))
-
+
def test_get_users_global_group_expired(self):
h = httplib2.Http(".cache")
- resp, content = create_global_group(self.global_group,
+ resp, content = utils.create_global_group(self.global_group,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ respG, contentG = utils.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_global_group(self.global_group,
- self.user,
- str(self.auth_token)
- )
- respG, contentG = get_user_global_group(self.global_group,
- str(self.exp_auth_token)
- )
-
+ respG, contentG = utils.add_user_global_group(self.global_group,
+ self.user,
+ str(self.auth_token))
+ respG, contentG = utils.get_user_global_group(self.global_group,
+ str(self.exp_auth_token))
+
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(respG['status']))
-
+
def test_get_users_global_group_expired_xml(self):
h = httplib2.Http(".cache")
- resp, content = create_global_group(self.global_group,
+ resp, content = utils.create_global_group(self.global_group,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ respG, contentG = utils.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_global_group(self.global_group,
- self.user,
- str(self.auth_token)
- )
- respG, contentG = get_user_global_group_xml(self.global_group,
- str(self.exp_auth_token)
- )
+ respG, contentG = utils.add_user_global_group(self.global_group,
+ self.user,
+ str(self.auth_token))
+ respG, contentG = utils.get_user_global_group_xml(self.global_group,
+ str(self.exp_auth_token))
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(respG['status']))
-
-class delete_users_global_group_test(unittest.TestCase):
-
+
+
+class delete_users_global_group_test(unittest.TestCase):
+
def setUp(self):
- self.token = get_token('joeuser', 'secrete', 'token')
- self.tenant = get_global_tenant()
- self.user = get_user()
- self.userdisabled = get_userdisabled()
- self.auth_token = get_auth_token()
- self.exp_auth_token = get_exp_auth_token()
- self.disabled_token = get_disabled_token()
+ self.token = utils.get_token('joeuser', 'secrete', 'token')
+ self.tenant = utils.get_global_tenant()
+ self.user = utils.get_user()
+ self.userdisabled = utils.get_userdisabled()
+ self.auth_token = utils.get_auth_token()
+ self.exp_auth_token = utils.get_exp_auth_token()
+ self.disabled_token = utils.get_disabled_token()
self.global_group = 'test_global_group'
-
-
def tearDown(self):
- respG, contentG = delete_user_global_group(self.global_group,
+ respG, contentG = utils.delete_user_global_group(self.global_group,
self.user,
str(self.auth_token))
-
- respG, contentG = delete_user(self.tenant, self.user,
+
+ respG, contentG = utils.delete_user(self.tenant, self.user,
str(self.auth_token))
- resp, content = delete_global_group(self.global_group,
+ resp, content = utils.delete_global_group(self.global_group,
self.auth_token)
+
def test_delete_user_global_group(self):
h = httplib2.Http(".cache")
- resp, content = create_global_group(self.global_group,
+ resp, content = utils.create_global_group(self.global_group,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ respG, contentG = utils.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_global_group(self.global_group,
- self.user,
- str(self.auth_token)
- )
-
- respG, contentG = delete_user_global_group(self.global_group,
- self.user,
- str(self.auth_token)
- )
-
+ respG, contentG = utils.add_user_global_group(self.global_group,
+ self.user,
+ str(self.auth_token))
+
+ respG, contentG = utils.delete_user_global_group(self.global_group,
+ self.user,
+ str(self.auth_token))
+
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(204, int(respG['status']))
-
-
+
def test_delete_user_global_group_xml(self):
h = httplib2.Http(".cache")
- resp, content = create_global_group(self.global_group,
+ resp, content = utils.create_global_group(self.global_group,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ respG, contentG = utils.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_global_group(self.global_group,
- self.user,
- str(self.auth_token)
- )
- respG, contentG = delete_user_global_group_xml(self.global_group,
- self.user,
- str(self.auth_token)
- )
+ respG, contentG = utils.add_user_global_group(self.global_group,
+ self.user,
+ str(self.auth_token))
+ respG, contentG = utils.delete_user_global_group_xml(self.global_group,
+ self.user,
+ str(self.auth_token))
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(204, int(respG['status']))
-
+
def test_delete_user_global_group_notfound(self):
h = httplib2.Http(".cache")
- resp, content = create_global_group(self.global_group,
+ resp, content = utils.create_global_group(self.global_group,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ respG, contentG = utils.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_global_group(self.global_group,
- self.user,
- str(self.disabled_token)
- )
- respG, contentG = delete_user_global_group(self.global_group,
- self.user,
- str(self.auth_token)
- )
- respG, contentG = delete_user_global_group(self.global_group,
- self.user,
- str(self.auth_token)
- )
+ respG, contentG = utils.add_user_global_group(self.global_group,
+ self.user,
+ str(self.disabled_token))
+ respG, contentG = utils.delete_user_global_group(self.global_group,
+ self.user,
+ str(self.auth_token))
+ respG, contentG = utils.delete_user_global_group(self.global_group,
+ self.user,
+ str(self.auth_token))
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(404, int(respG['status']))
-
+
def test_delete_user_global_group_notfound_xml(self):
h = httplib2.Http(".cache")
- resp, content = create_global_group(self.global_group,
+ resp, content = utils.create_global_group(self.global_group,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ respG, contentG = utils.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_global_group(self.global_group,
- self.user,
- str(self.disabled_token)
- )
- respG, contentG = delete_user_global_group(self.global_group,
- self.user,
- str(self.auth_token)
- )
- respG, contentG = delete_user_global_group_xml(self.global_group,
- self.user,
- str(self.auth_token)
- )
-
+ respG, contentG = utils.add_user_global_group(self.global_group,
+ self.user,
+ str(self.disabled_token))
+ respG, contentG = utils.delete_user_global_group(self.global_group,
+ self.user,
+ str(self.auth_token))
+ respG, contentG = utils.delete_user_global_group_xml(self.global_group,
+ self.user,
+ str(self.auth_token))
+
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(404, int(respG['status']))
+
if __name__ == '__main__':
- unittest.main() \ No newline at end of file
+ unittest.main()
diff --git a/test/unit/test_keystone.py b/test/unit/test_keystone.py
index 31b6ae32..9a6fe4df 100644
--- a/test/unit/test_keystone.py
+++ b/test/unit/test_keystone.py
@@ -1,9 +1,11 @@
-#TODO (India Team) Need to modify this script
import logging
import os
import unittest
-
-MODULE_EXTENSIONS = set('.py .pyc .pyo'.split())
+from lxml import etree
+MODULE_EXTENSIONS = set('.py'.split())
+TEST_FILES = ['test_authentication.py', 'test_keystone.py', 'test_tenants.py',
+ 'test_common.py', 'test_users.py','test_tenant_groups.py',
+ 'test_token.py', 'test_version.py', 'test_groups.py']
def unit_test_extractor(tup, path, filenames):
"""Pull ``unittest.TestSuite``s from modules in path
@@ -16,17 +18,19 @@ def unit_test_extractor(tup, path, filenames):
relpath = os.path.relpath(path, package_path)
relpath_pieces = relpath.split(os.sep)
- if relpath_pieces[0] == '.': # Base directory.
- relpath_pieces.pop(0) # Otherwise, screws up module name.
+ if relpath_pieces[0] == '.': # Base directory.
+ relpath_pieces.pop(0) # Otherwise, screws up module name.
elif not any(os.path.exists(os.path.join(path, '__init__' + ext))
for ext in MODULE_EXTENSIONS):
- return # Not a package directory and not the base directory, reject.
+ return # Not a package directory and not the base directory, reject.
logging.info('Base: %s', '.'.join(relpath_pieces))
for filename in filenames:
- base, ext = os.path.splitext(filename)
- if ext not in MODULE_EXTENSIONS: # Not a Python module.
+ if filename not in TEST_FILES:
continue
+ base, ext = os.path.splitext(filename)
+ #if ext not in MODULE_EXTENSIONS : # Not a Python module.
+ # continue
logging.info('Module: %s', base)
module_name = '.'.join(relpath_pieces + [base])
logging.info('Importing from %s', module_name)
@@ -35,6 +39,7 @@ def unit_test_extractor(tup, path, filenames):
logging.info('Got suites: %s', module_suites)
suites += module_suites
+
def get_test_suites(path):
""":return: Iterable of suites for the packages/modules
present under :param:`path`.
@@ -50,5 +55,4 @@ if __name__ == '__main__':
package_path = os.path.dirname(os.path.abspath(__file__))
suites = get_test_suites(package_path)
for suite in suites:
- unittest.TextTestRunner(verbosity=2).run(suite)
-
+ unittest.TextTestRunner(verbosity=1).run(suite)
diff --git a/test/unit/test_tenant_groups.py b/test/unit/test_tenant_groups.py
index f15bd909..9c46db42 100644
--- a/test/unit/test_tenant_groups.py
+++ b/test/unit/test_tenant_groups.py
@@ -4,130 +4,120 @@ import sys
sys.path.append(os.path.abspath(os.path.join(os.path.abspath(__file__),
'..', '..', '..', '..', 'keystone')))
import unittest
-from webtest import TestApp
import httplib2
import json
-from lxml import etree
-import unittest
-from webtest import TestApp
-from test_common import *
+from lxml import etree
+import test_common as util
class tenant_group_test(unittest.TestCase):
def setUp(self):
- self.token = get_token('joeuser', 'secrete', 'token')
- self.tenant = get_tenant()
- self.user = get_user()
- self.userdisabled = get_userdisabled()
- self.auth_token = get_auth_token()
- self.exp_auth_token = get_exp_auth_token()
- self.disabled_token = get_disabled_token()
+ self.token = util.get_token('joeuser', 'secrete', 'token')
+ self.tenant = util.get_tenant()
+ self.user = util.get_user()
+ self.userdisabled = util.get_userdisabled()
+ self.auth_token = util.get_auth_token()
+ self.exp_auth_token = util.get_exp_auth_token()
+ self.disabled_token = util.get_disabled_token()
self.tenant_group = 'test_tenant_group_add'
def tearDown(self):
- resp, content = delete_tenant_group(self.tenant_group,
+ resp, content = util.delete_tenant_group(self.tenant_group,
self.tenant,
self.auth_token)
- resp, content = delete_tenant(self.tenant, self.auth_token)
+ resp, content = util.delete_tenant(self.tenant, self.auth_token)
class create_tenant_group_test(tenant_group_test):
def test_tenant_group_create(self):
- resp, content = delete_tenant(self.tenant, str(self.auth_token))
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = delete_tenant_group(self.tenant_group,
+ resp, content = util.delete_tenant(self.tenant, str(self.auth_token))
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
+ resp, content = util.delete_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- if int(respG['status']) not in (200, 201):
- self.fail('Failed due to %d' % int(respG['status']))
+ if int(resp['status']) not in (200, 201):
+ self.fail('Failed due to %d' % int(resp['status']))
def test_tenant_group_create_xml(self):
- resp, content = delete_tenant_xml(self.tenant, str(self.auth_token))
- resp, content = create_tenant_xml(self.tenant, str(self.auth_token))
- respG, contentG = delete_tenant_group_xml(self.tenant_group,
+ resp, content = util.delete_tenant_xml(self.tenant,
+ str(self.auth_token))
+ resp, content = util.create_tenant_xml(self.tenant,
+ str(self.auth_token))
+ resp, content = util.delete_tenant_group_xml(self.tenant_group,
self.tenant,
str(self.auth_token))
- respG, contentG = create_tenant_group_xml(self.tenant_group,
+ resp, content = util.create_tenant_group_xml(self.tenant_group,
self.tenant,
str(self.auth_token))
- self.tenant = self.tenant
- self.tenant_group = self.tenant_group
+
content = etree.fromstring(content)
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
- if int(respG['status']) not in (200, 201):
- self.fail('Failed due to %d' % int(respG['status']))
+ if int(resp['status']) not in (200, 201):
+ self.fail('Failed due to %d' % int(resp['status']))
def test_tenant_group_create_again(self):
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ resp, content = util.create_tenant(self.tenant,
+ str(self.auth_token))
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- if int(respG['status']) == 200:
+ if int(resp['status']) == 200:
self.tenant = content['tenant']['id']
- self.tenant_group = contentG['group']['id']
- if int(respG['status']) == 500:
+ self.tenant_group = content['group']['id']
+ if int(resp['status']) == 500:
self.fail('IDM fault')
- elif int(respG['status']) == 503:
+ elif int(resp['status']) == 503:
self.fail('Service Not Available')
- self.assertEqual(409, int(respG['status']))
- if int(respG['status']) == 200:
- self.tenant = content['tenant']['id']
- self.tenant_group = contentG['group']['id']
+ self.assertEqual(409, int(resp['status']))
def test_tenant_group_create_again_xml(self):
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- respG, contentG = create_tenant_group_xml(self.tenant_group,
+ resp, content = util.create_tenant_xml("test_tenant",
+ str(self.auth_token))
+ resp, content = util.create_tenant_group_xml(self.tenant_group,
self.tenant,
str(self.auth_token))
- respG, contentG = create_tenant_group_xml(self.tenant_group,
+ resp_new, content_new = util.create_tenant_group_xml(self.tenant_group,
self.tenant,
str(self.auth_token))
- content = etree.fromstring(content)
- contentG = etree.fromstring(contentG)
- if int(respG['status']) == 200:
- self.tenant = content.get("id")
- self.tenant_group = contentG.get("id")
- if int(respG['status']) == 500:
+ if int(resp['status']) == 500:
self.fail('IDM fault')
- elif int(respG['status']) == 503:
+ elif int(resp['status']) == 503:
self.fail('Service Not Available')
- self.assertEqual(409, int(respG['status']))
- if int(respG['status']) == 200:
- self.tenant = content.get("id")
- self.tenant_group = contentG.get("id")
+ self.assertEqual(409, int(resp['status']))
def test_tenant_group_create_unauthorized_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group_xml(self.tenant_group,
+ header = httplib2.Http(".cache")
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+ resp, content = util.create_tenant_group_xml(self.tenant_group,
self.tenant,
str(self.auth_token))
+
if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
- if int(respG['status']) == 200:
- self.tenant_group = respG['group']['id']
- url = '%stenant/%s/groups' % (URL, self.tenant)
+ self.tenant_group = resp['group']['id']
+ url = '%stenant/%s/groups' % (util.URL, self.tenant)
body = {"group": {"id": self.tenant_group,
"description": "A description ..."}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
+ resp, content = header.request(url, "POST", body=json.dumps(body),
headers={"Content-Type": "application/json",
"X-Auth-Token": self.token})
if int(resp['status']) == 500:
@@ -137,16 +127,16 @@ class create_tenant_group_test(tenant_group_test):
self.assertEqual(401, int(resp['status']))
def test_tenant_group_create_unauthorized_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
+ header = httplib2.Http(".cache")
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
if int(resp['status']) == 200:
self.tenant = content['tenant']['id']
- url = '%stenant/%s/groups' % (URL, self.tenant)
+ url = '%stenant/%s/groups' % (util.URL, self.tenant)
body = '<?xml version="1.0" encoding="UTF-8"?> \
<group xmlns="http://docs.openstack.org/idm/api/v1.0" \
id="%s"><description>A description...</description> \
</group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,
+ resp, content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.token,
"ACCEPT": "application/xml"})
@@ -157,14 +147,14 @@ class create_tenant_group_test(tenant_group_test):
self.assertEqual(401, int(resp['status']))
def test_tenant_group_create_expired_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
+ header = httplib2.Http(".cache")
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
if int(resp['status']) == 200:
self.tenant = content['tenant']['id']
- url = '%stenant/%s/groups' % (URL, self.tenant)
+ url = '%stenant/%s/groups' % (util.URL, self.tenant)
body = {"group": {"id": self.tenant_group,
"description": "A description ..."}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
+ resp, content = header.request(url, "POST", body=json.dumps(body),
headers={"Content-Type": "application/json",
"X-Auth-Token": self.exp_auth_token})
if int(resp['status']) == 500:
@@ -174,18 +164,19 @@ class create_tenant_group_test(tenant_group_test):
self.assertEqual(403, int(resp['status']))
def test_tenant_group_create_expired_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml(self.tenant, str(self.auth_token))
+ header = httplib2.Http(".cache")
+ resp, content = util.create_tenant_xml(self.tenant,
+ str(self.auth_token))
content = etree.fromstring(content)
if int(resp['status']) == 200:
self.tenant = content.get('id')
- url = '%stenant/%s/groups' % (URL, self.tenant)
+ url = '%stenant/%s/groups' % (util.URL, self.tenant)
body = '<?xml version="1.0" encoding="UTF-8"?> \
<group xmlns="http://docs.openstack.org/idm/api/v1.0" \
id="%s"> \
<description>A description...</description> \
</group>' % self.tenant
- resp, content = h.request(url, "POST", body=body,
+ resp, content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.exp_auth_token,
"ACCEPT": "application/xml"})
@@ -196,14 +187,14 @@ class create_tenant_group_test(tenant_group_test):
self.assertEqual(403, int(resp['status']))
def test_tenant_group_create_missing_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
+ header = httplib2.Http(".cache")
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
if int(resp['status']) == 200:
self.tenant = content['tenant']['id']
- url = '%stenant/%s/groups' % (URL, self.tenant)
+ url = '%stenant/%s/groups' % (util.URL, self.tenant)
body = {"group": {"id": self.tenant_group,
"description": "A description ..."}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
+ resp, content = header.request(url, "POST", body=json.dumps(body),
headers={"Content-Type": "application/json"})
if int(resp['status']) == 500:
self.fail('IDM fault')
@@ -211,20 +202,20 @@ class create_tenant_group_test(tenant_group_test):
self.fail('Service Not Available')
self.assertEqual(401, int(resp['status']))
-
def test_tenant_group_create_missing_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml(self.tenant, str(self.auth_token))
+ header = httplib2.Http(".cache")
+ resp, content = util.create_tenant_xml(self.tenant,
+ str(self.auth_token))
content = etree.fromstring(content)
if int(resp['status']) == 200:
self.tenant = content.get('id')
- url = '%stenant/%s/groups' % (URL, self.tenant)
+ url = '%stenant/%s/groups' % (util.URL, self.tenant)
body = '<?xml version="1.0" encoding="UTF-8"?> \
<group xmlns="http://docs.openstack.org/idm/api/v1.0" \
id="%s"> \
<description>A description...</description> \
</group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,
+ resp, content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/xml",
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
@@ -233,19 +224,18 @@ class create_tenant_group_test(tenant_group_test):
self.fail('Service Not Available')
self.assertEqual(401, int(resp['status']))
-
def test_tenant_group_create_disabled_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
+ header = httplib2.Http(".cache")
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
if int(resp['status']) == 200:
self.tenant = content['tenant']['id']
- url = '%stenant/%s/groups' % (URL, self.tenant)
+ url = '%stenant/%s/groups' % (util.URL, self.tenant)
body = '{"group": { "id": "%s", \
"description": "A description ..." } }' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,
+ resp, content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/json",
- "X-Auth-Token": self.disabled_token})
+ "X-Auth-Token": self.disabled_token})
if int(resp['status']) == 500:
self.fail('IDM fault')
@@ -254,19 +244,20 @@ class create_tenant_group_test(tenant_group_test):
self.assertEqual(403, int(resp['status']))
def test_tenant_group_create_disabled_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml(self.tenant, str(self.auth_token))
+ header = httplib2.Http(".cache")
+ resp, content = util.create_tenant_xml(self.tenant,
+ str(self.auth_token))
content = etree.fromstring(content)
if int(resp['status']) == 200:
self.tenant = content.get('id')
- url = '%stenant/%s/groups' % (URL, self.tenant)
+ url = '%stenant/%s/groups' % (util.URL, self.tenant)
body = '<?xml version="1.0" encoding="UTF-8"?> \
<group xmlns="http://docs.openstack.org/idm/api/v1.0" \
id="%s"> \
<description>A description...</description> \
</group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,
+ resp, content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.disabled_token,
"ACCEPT": "application/xml"})
@@ -277,15 +268,15 @@ class create_tenant_group_test(tenant_group_test):
self.assertEqual(403, int(resp['status']))
def test_tenant_group_create_invalid_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
+ header = httplib2.Http(".cache")
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
if int(resp['status']) == 200:
self.tenant = content['tenant']['id']
- url = '%stenant/%s/groups' % (URL, self.tenant)
+ url = '%stenant/%s/groups' % (util.URL, self.tenant)
body = '{"group": { "id": "%s", \
"description": "A description ..." } }' % self.tenant
- resp, content = h.request(url, "POST", body=body,
+ resp, content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/json",
"X-Auth-Token": 'nonexsitingtoken'})
@@ -296,19 +287,20 @@ class create_tenant_group_test(tenant_group_test):
self.assertEqual(404, int(resp['status']))
def test_tenant_group_create_invalid_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml(self.tenant, str(self.auth_token))
+ header = httplib2.Http(".cache")
+ resp, content = util.create_tenant_xml(self.tenant,
+ str(self.auth_token))
content = etree.fromstring(content)
if int(resp['status']) == 200:
self.tenant = content.get('id')
- url = '%stenant/%s/groups' % (URL, self.tenant)
+ url = '%stenant/%s/groups' % (util.URL, self.tenant)
body = '<?xml version="1.0" encoding="UTF-8"?> \
<group xmlns="http://docs.openstack.org/idm/api/v1.0" \
id="%s"> \
<description>A description...</description> \
</group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,
+ resp, content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/xml",
"X-Auth-Token": 'nonexsitingtoken',
"ACCEPT": "application/xml"})
@@ -323,16 +315,16 @@ class create_tenant_group_test(tenant_group_test):
class get_tenant_groups_test(tenant_group_test):
def test_get_tenant_groups(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
+ header = httplib2.Http(".cache")
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- url = '%stenant/%s/groups' % (URL, self.tenant)
+ url = '%stenant/%s/groups' % (util.URL, self.tenant)
- resp, content = h.request(url, "GET", body='{}',
+ resp, content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.auth_token})
if int(resp['status']) == 500:
@@ -342,14 +334,14 @@ class get_tenant_groups_test(tenant_group_test):
self.assertEqual(200, int(resp['status']))
def test_get_tenant_groups_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
+ header = httplib2.Http(".cache")
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group_xml(self.tenant_group,
+ resp, content = util.create_tenant_group_xml(self.tenant_group,
self.tenant,
str(self.auth_token))
- url = '%stenant/%s/groups' % (URL, self.tenant)
- resp, content = h.request(url, "GET", body='',
+ url = '%stenant/%s/groups' % (util.URL, self.tenant)
+ resp, content = header.request(url, "GET", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.auth_token,
"ACCEPT": "application/xml"})
@@ -360,15 +352,15 @@ class get_tenant_groups_test(tenant_group_test):
self.assertEqual(200, int(resp['status']))
def test_get_tenant_groups_unauthorized_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
+ header = httplib2.Http(".cache")
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- url = '%stenant/%s/groups' % (URL, self.tenant)
+ url = '%stenant/%s/groups' % (util.URL, self.tenant)
#test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',
+ resp, content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.token})
if int(resp['status']) == 500:
@@ -378,14 +370,14 @@ class get_tenant_groups_test(tenant_group_test):
self.assertEqual(401, int(resp['status']))
def test_get_tenant_groups_unauthorized_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ header = httplib2.Http(".cache")
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- url = '%stenant/%s/groups' % (URL, self.tenant)
+ url = '%stenant/%s/groups' % (util.URL, self.tenant)
#test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',
+ resp, content = header.request(url, "GET", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.token,
"ACCEPT": "application/xml"})
@@ -396,16 +388,16 @@ class get_tenant_groups_test(tenant_group_test):
self.assertEqual(401, int(resp['status']))
def test_get_tenant_groups_exp_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ header = httplib2.Http(".cache")
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- url = '%stenant/%s/groups' % (URL, self.tenant)
+ url = '%stenant/%s/groups' % (util.URL, self.tenant)
#test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',
+ resp, content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
- "X-Auth-Token": self.exp_auth_token})
+ "X-Auth-Token": self.exp_auth_token})
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
@@ -413,14 +405,14 @@ class get_tenant_groups_test(tenant_group_test):
self.assertEqual(403, int(resp['status']))
def test_get_tenant_groups_exp_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ header = httplib2.Http(".cache")
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- url = '%stenant/%s/groups' % (URL, self.tenant)
+ url = '%stenant/%s/groups' % (util.URL, self.tenant)
#test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',
+ resp, content = header.request(url, "GET", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.exp_auth_token,
"ACCEPT": "application/xml"})
@@ -434,14 +426,15 @@ class get_tenant_groups_test(tenant_group_test):
class get_tenant_group_test(tenant_group_test):
def test_get_tenant_group(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ header = httplib2.Http(".cache")
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL, self.tenant, self.tenant_group)
+ url = '%stenant/%s/groups/%s' % (util.URL, self.tenant,
+ self.tenant_group)
#test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',
+ resp, content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.auth_token})
if int(resp['status']) == 500:
@@ -451,14 +444,15 @@ class get_tenant_group_test(tenant_group_test):
self.assertEqual(200, int(resp['status']))
def test_get_tenant_group_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ header = httplib2.Http(".cache")
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL, self.tenant, self.tenant_group)
+ url = '%stenant/%s/groups/%s' % (util.URL, self.tenant,
+ self.tenant_group)
#test for Content-Type = application/xml
- resp, content = h.request(url, "GET", body='',
+ resp, content = header.request(url, "GET", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.auth_token,
"ACCEPT": "application/xml"})
@@ -469,15 +463,16 @@ class get_tenant_group_test(tenant_group_test):
self.assertEqual(200, int(resp['status']))
def test_get_tenant_group_bad(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ header = httplib2.Http(".cache")
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL, 'tenant_bad', self.tenant_group)
+ url = '%stenant/%s/groups/%s' % (util.URL, 'tenant_bad',
+ self.tenant_group)
#test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{',
+ resp, content = header.request(url, "GET", body='{',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.auth_token})
if int(resp['status']) == 500:
@@ -487,14 +482,15 @@ class get_tenant_group_test(tenant_group_test):
self.assertEqual(404, int(resp['status']))
def test_get_tenant_group_bad_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ header = httplib2.Http(".cache")
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL, 'tenant_bad', self.tenant_group)
+ url = '%stenant/%s/groups/%s' % (util.URL, 'tenant_bad',
+ self.tenant_group)
#test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{',
+ resp, content = header.request(url, "GET", body='{',
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.auth_token,
"ACCEPT": "application/xml"})
@@ -505,32 +501,34 @@ class get_tenant_group_test(tenant_group_test):
self.assertEqual(404, int(resp['status']))
def test_get_tenant_group_not_found(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ header = httplib2.Http(".cache")
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL, self.tenant, 'nonexistinggroup')
+ url = '%stenant/%s/groups/%s' % (util.URL, self.tenant,
+ 'nonexistinggroup')
#test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',
+ resp, content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.auth_token})
if int(resp['status']) == 500:
- self.fail('IDM fault')
+ self.fail('IDM fault')
elif int(resp['status']) == 503:
- self.fail('Service Not Available')
+ self.fail('Service Not Available')
self.assertEqual(404, int(resp['status']))
def test_get_tenant_group_not_found_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ header = httplib2.Http(".cache")
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL, self.tenant, 'nonexistinggroup')
+ url = '%stenant/%s/groups/%s' % (util.URL, self.tenant,
+ 'nonexistinggroup')
#test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',
+ resp, content = header.request(url, "GET", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.auth_token,
"ACCEPT": "application/xml"})
@@ -544,25 +542,25 @@ class get_tenant_group_test(tenant_group_test):
class update_tenant_group_test(tenant_group_test):
def test_update_tenant_group(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- resp, content = delete_tenant_group(self.tenant_group,
+ header = httplib2.Http(".cache")
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
+ resp, content = util.delete_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL, self.tenant, self.tenant_group)
+ url = '%stenant/%s/groups/%s' % (util.URL, self.tenant,
+ self.tenant_group)
data = '{"group": { "id":"%s","description": "A NEW description..." ,\
"tenantId":"%s" }}' % (self.tenant_group, self.tenant)
#test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,
+ resp, content = header.request(url, "PUT", body=data,
headers={"Content-Type": "application/json",
"X-Auth-Token": self.auth_token})
-
body = json.loads(content)
if int(resp['status']) == 500:
self.fail('IDM fault')
@@ -573,20 +571,21 @@ class update_tenant_group_test(tenant_group_test):
self.assertEqual('A NEW description...', body['group']['description'])
def test_update_tenant_group_xml(self):
- h = httplib2.Http(".cache")
- resp, content = delete_tenant(self.tenant, str(self.auth_token))
+ header = httplib2.Http(".cache")
+ resp, content = util.delete_tenant(self.tenant, str(self.auth_token))
- resp, content = create_tenant(self.tenant, str(self.auth_token))
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
- resp, content = delete_tenant_group(self.tenant_group,
+ resp, content = util.delete_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL, self.tenant , self.tenant_group)
+ url = '%stenant/%s/groups/%s' % (util.URL, self.tenant,
+ self.tenant_group)
data = '<group xmlns="http://docs.openstack.org/idm/api/v1.0" \
tenantId="%s" id="%s"> \
@@ -594,12 +593,11 @@ class update_tenant_group_test(tenant_group_test):
</group>' % (self.tenant, self.tenant_group)
#test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,
+ resp, content = header.request(url, "PUT", body=data,
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.auth_token,
"ACCEPT": "application/xml"})
-
body = etree.fromstring(content)
desc = body.find("{http://docs.openstack.org/idm/api/v1.0}description")
@@ -613,20 +611,21 @@ class update_tenant_group_test(tenant_group_test):
self.assertEqual('A NEW description...', desc.text)
def test_update_tenant_group_bad(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- resp, content = delete_tenant_group(self.tenant_group,
+ header = httplib2.Http(".cache")
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
+ resp, content = util.delete_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL, self.tenant, self.tenant_group)
+ url = '%stenant/%s/groups/%s' % (util.URL, self.tenant,
+ self.tenant_group)
data = '{"group": { "description_bad": "A NEW description...",\
"id":"%s","tenantId":"%s" }}' % (self.tenant_group, self.tenant)
#test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,
+ resp, content = header.request(url, "PUT", body=data,
headers={"Content-Type": "application/json",
"X-Auth-Token": self.auth_token})
if int(resp['status']) == 500:
@@ -636,22 +635,23 @@ class update_tenant_group_test(tenant_group_test):
self.assertEqual(400, int(resp['status']))
def test_update_tenant_group_bad_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- resp, content = delete_tenant_group(self.tenant_group,
+ header = httplib2.Http(".cache")
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
+ resp, content = util.delete_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL, self.tenant, self.tenant_group)
+ url = '%stenant/%s/groups/%s' % (util.URL, self.tenant,
+ self.tenant_group)
data = '<?xml version="1.0" encoding="UTF-8"?> \
<group xmlns="http://docs.openstack.org/idm/api/v1.0" \
tenantId="%s" id="%s"> \
<description_bad>A NEW description...</description> \
</group>' % (self.tenant, self.tenant_group)
#test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,
+ resp, content = header.request(url, "PUT", body=data,
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.auth_token,
"ACCEPT": "application/xml"})
@@ -662,20 +662,20 @@ class update_tenant_group_test(tenant_group_test):
self.assertEqual(400, int(resp['status']))
def test_update_tenant_group_not_found(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- resp, content = delete_tenant_group(self.tenant_group,
+ header = httplib2.Http(".cache")
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
+ resp, content = util.delete_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- url = '%stenant/%s/groups/NonexistingID' % (URL, self.tenant)
+ url = '%stenant/%s/groups/NonexistingID' % (util.URL, self.tenant)
data = '{"group": { "description": "A NEW description...",\
"id":"NonexistingID", "tenantId"="test_tenant" }}'
#test for Content-Type = application/json
- resp, content = h.request(url, "GET", body=data,
+ resp, content = header.request(url, "GET", body=data,
headers={"Content-Type": "application/json",
"X-Auth-Token": self.auth_token})
if int(resp['status']) == 500:
@@ -685,16 +685,16 @@ class update_tenant_group_test(tenant_group_test):
self.assertEqual(404, int(resp['status']))
def test_update_tenant_group_not_found_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/NonexistingID' % (URL, self.tenant)
+ header = httplib2.Http(".cache")
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups/NonexistingID' % (util.URL, self.tenant)
data = '<?xml version="1.0" encoding="UTF-8"?> \
<group xmlns="http://docs.openstack.org/idm/api/v1.0" \
id="NonexistingID", "tenant_id"="test_tenant"> \
<description_bad>A NEW description...</description> \
</group>'
#test for Content-Type = application/json
- resp, content = h.request(url, "GET", body=data,
+ resp, content = header.request(url, "GET", body=data,
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.auth_token,
"ACCEPT": "application/xml"})
@@ -708,103 +708,98 @@ class update_tenant_group_test(tenant_group_test):
class delete_tenant_group_test(tenant_group_test):
def test_delete_tenant_group_not_found(self):
- #resp,content=create_tenant("test_tenant_delete", str(self.auth_token))
- resp, content = delete_tenant_group("test_tenant_delete111",
+ resp, content = util.delete_tenant_group("test_tenant_delete111",
self.tenant,
str(self.auth_token))
self.assertEqual(404, int(resp['status']))
def test_delete_tenant_group_not_found_xml(self):
- #resp,content=create_tenant("test_tenant_delete", str(self.auth_token))
- resp, content = delete_tenant_group_xml("test_tenant_delete111",
+ resp, content = util.delete_tenant_group_xml("test_tenant_delete111",
self.tenant,
str(self.auth_token))
self.assertEqual(404, int(resp['status']))
def test_delete_tenant_group(self):
- resp, content = create_tenant("test_tenant_delete",
+ resp, content = util.create_tenant("test_tenant_delete",
str(self.auth_token))
- respG, contentG = create_tenant_group('test_tenant_group_delete',
+ resp, content = util.create_tenant_group('test_tenant_group_delete',
"test_tenant_delete",
str(self.auth_token))
- respG, contentG = delete_tenant_group('test_tenant_group_delete',
+ resp, content = util.delete_tenant_group('test_tenant_group_delete',
"test_tenant_delete",
str(self.auth_token))
- resp, content = delete_tenant("test_tenant_delete",
+ resp, content = util.delete_tenant("test_tenant_delete",
str(self.auth_token))
- self.assertEqual(204, int(respG['status']))
+ self.assertEqual(204, int(resp['status']))
def test_delete_tenant_group_xml(self):
- resp, content = create_tenant("test_tenant_delete",
+ resp, content = util.create_tenant("test_tenant_delete",
str(self.auth_token))
- respG, contentG = create_tenant_group('test_tenant_group_delete',
+ resp, content = util.create_tenant_group('test_tenant_group_delete',
"test_tenant_delete",
str(self.auth_token))
- respG, contentG = delete_tenant_group('test_tenant_group_delete',
+ resp, content = util.delete_tenant_group('test_tenant_group_delete',
"test_tenant_delete",
str(self.auth_token))
- resp, content = delete_tenant_xml("test_tenant_delete",
+ resp, content = util.delete_tenant_xml("test_tenant_delete",
str(self.auth_token))
- self.assertEqual(204, int(respG['status']))
+ self.assertEqual(204, int(resp['status']))
class add_user_tenant_group_test(tenant_group_test):
def setUp(self):
- self.token = get_token('joeuser', 'secrete', 'token')
+ self.token = util.get_token('joeuser', 'secrete', 'token')
self.tenant = 'test_tenant'
- self.user = get_user()
- self.userdisabled = get_userdisabled()
- self.auth_token = get_auth_token()
- self.exp_auth_token = get_exp_auth_token()
- self.disabled_token = get_disabled_token()
+ self.user = util.get_user()
+ self.userdisabled = util.get_userdisabled()
+ self.auth_token = util.get_auth_token()
+ self.exp_auth_token = util.get_exp_auth_token()
+ self.disabled_token = util.get_disabled_token()
self.tenant_group = 'test_tenant_group_add'
def tearDown(self):
- respG, contentG = delete_user_tenant_group(self.tenant,
+ resp, content = util.delete_user_tenant_group(self.tenant,
self.tenant_group,
self.user,
str(self.auth_token))
- respG, contentG = delete_user(self.tenant, self.user,
+ resp, content = util.delete_user(self.tenant, self.user,
str(self.auth_token))
- resp, content = delete_tenant_group(self.tenant_group,
+ resp, content = util.delete_tenant_group(self.tenant_group,
self.tenant,
self.auth_token)
- resp, content = delete_tenant(self.tenant, self.auth_token)
-
+ resp, content = util.delete_tenant(self.tenant, self.auth_token)
def test_add_user_tenant_group(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ resp, content = util.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_tenant_group(self.tenant, self.tenant_group,
- self.user, str(self.auth_token)
- )
+ resp, content = util.add_user_tenant_group(self.tenant,
+ self.tenant_group,
+ self.user,
+ str(self.auth_token))
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
- if int(respG['status']) not in (200, 201):
- self.fail('Failed due to %d' % int(respG['status']))
-
+ if int(resp['status']) not in (200, 201):
+ self.fail('Failed due to %d' % int(resp['status']))
def test_add_user_tenant_group_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
- self.tenant,
- str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
+ resp, content = util.create_tenant_group(self.tenant_group,
+ self.tenant,
+ str(self.auth_token))
+ resp, content = util.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_tenant_group_xml(self.tenant,
+ resp, content = util.add_user_tenant_group_xml(self.tenant,
self.tenant_group,
self.user,
str(self.auth_token))
@@ -813,403 +808,388 @@ class add_user_tenant_group_test(tenant_group_test):
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
- if int(respG['status']) not in (200, 201):
- self.fail('Failed due to %d' % int(respG['status']))
-
+ if int(resp['status']) not in (200, 201):
+ self.fail('Failed due to %d' % int(resp['status']))
def test_add_user_tenant_group_conflict(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ resp, content = util.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_tenant_group(self.tenant, self.tenant_group,
- self.user, str(self.auth_token)
- )
- respG, contentG = add_user_tenant_group(self.tenant, self.tenant_group,
- self.user, str(self.auth_token)
- )
-
+ resp, content = util.add_user_tenant_group(self.tenant,
+ self.tenant_group,
+ self.user,
+ str(self.auth_token))
+ resp, content = util.add_user_tenant_group(self.tenant,
+ self.tenant_group,
+ self.user,
+ str(self.auth_token))
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
- self.assertEqual(409, int(respG['status']))
+ self.assertEqual(409, int(resp['status']))
def test_add_user_tenant_group_conflict_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ resp, content = util.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_tenant_group_xml(self.tenant, self.tenant_group,
- self.user, str(self.auth_token)
- )
- respG, contentG = add_user_tenant_group_xml(self.tenant, self.tenant_group,
- self.user, str(self.auth_token)
- )
+ resp, content = util.add_user_tenant_group_xml(self.tenant,
+ self.tenant_group,
+ self.user, str(self.auth_token))
+ resp, content = util.add_user_tenant_group_xml(self.tenant,
+ self.tenant_group,
+ self.user,
+ str(self.auth_token))
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
- self.assertEqual(409, int(respG['status']))
+ self.assertEqual(409, int(resp['status']))
def test_add_user_tenant_group_unauthorized(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ resp, content = util.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_tenant_group(self.tenant, self.tenant_group,
- self.user, self.token)
+ resp, content = util.add_user_tenant_group(self.tenant,
+ self.tenant_group,
+ self.user, self.token)
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
- self.assertEqual(401, int(respG['status']))
+ self.assertEqual(401, int(resp['status']))
def test_add_user_tenant_group_unauthorized_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
- str(self.auth_token))
+ resp, content = util.create_user(self.tenant, self.user,
+ str(self.auth_token))
- respG, contentG = add_user_tenant_group_xml(self.tenant, self.tenant_group,
- self.user, self.token)
+ resp, content = util.add_user_tenant_group_xml(self.tenant,
+ self.tenant_group,
+ self.user, self.token)
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
- self.assertEqual(401, int(respG['status']))
+ self.assertEqual(401, int(resp['status']))
def test_add_user_tenant_group_forbidden(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
- self.tenant,
- str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
+ resp, content = util.create_tenant_group(self.tenant_group,
+ self.tenant,
+ str(self.auth_token))
+ resp, content = util.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_tenant_group(self.tenant, self.tenant_group,
- self.user, self.disabled_token)
+ resp, content = util.add_user_tenant_group(self.tenant,
+ self.tenant_group,
+ self.user,
+ self.disabled_token)
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
- self.assertEqual(403, int(respG['status']))
+ self.assertEqual(403, int(resp['status']))
def test_add_user_tenant_group_forbidden_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ resp, content = util.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_tenant_group_xml(self.tenant, self.tenant_group,
- self.user, self.disabled_token)
+ resp, content = util.add_user_tenant_group_xml(self.tenant,
+ self.tenant_group,
+ self.user,
+ self.disabled_token)
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
- self.assertEqual(403, int(respG['status']))
+ self.assertEqual(403, int(resp['status']))
class get_users_tenant_group_test(tenant_group_test):
def setUp(self):
- self.token = get_token('joeuser', 'secrete', 'token')
+ self.token = util.get_token('joeuser', 'secrete', 'token')
self.tenant = 'test_tenant'
- self.user = get_user()
- self.userdisabled = get_userdisabled()
- self.auth_token = get_auth_token()
- self.exp_auth_token = get_exp_auth_token()
- self.disabled_token = get_disabled_token()
+ self.user = util.get_user()
+ self.userdisabled = util.get_userdisabled()
+ self.auth_token = util.get_auth_token()
+ self.exp_auth_token = util.get_exp_auth_token()
+ self.disabled_token = util.get_disabled_token()
self.tenant_group = 'test_tenant_group_add'
def tearDown(self):
- respG, contentG = delete_user_tenant_group(self.tenant,
+ resp, content = util.delete_user_tenant_group(self.tenant,
self.tenant_group,
self.user,
str(self.auth_token))
- respG, contentG = delete_user(self.tenant, self.user,
+ resp, content = util.delete_user(self.tenant, self.user,
str(self.auth_token))
- resp, content = delete_tenant_group(self.tenant_group,
+ resp, content = util.delete_tenant_group(self.tenant_group,
self.tenant,
self.auth_token)
- resp, content = delete_tenant(self.tenant, self.auth_token)
+ resp, content = util.delete_tenant(self.tenant, self.auth_token)
def test_get_users_tenant_group(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ resp, content = util.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_tenant_group(self.tenant, self.tenant_group,
- self.user, str(self.auth_token)
- )
- respG, contentG = get_user_tenant_group(self.tenant, self.tenant_group,
- str(self.auth_token)
- )
+ resp, content = util.add_user_tenant_group(self.tenant,
+ self.tenant_group,
+ self.user,
+ str(self.auth_token))
+ resp, content = util.get_user_tenant_group(self.tenant,
+ self.tenant_group,
+ str(self.auth_token))
+
+ self.assertEqual(200, int(resp['status']))
+ def test_get_users_tenant_group_xml(self):
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
- self.assertEqual(200, int(respG['status']))
-
-
- def test_get_users_tenant_group_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ resp, content = util.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_tenant_group_xml(self.tenant,
+ resp, content = util.add_user_tenant_group_xml(self.tenant,
self.tenant_group,
self.user,
str(self.auth_token))
- respG, contentG = get_user_tenant_group_xml(self.tenant,
+ resp, content = util.get_user_tenant_group_xml(self.tenant,
self.tenant_group,
- str(self.auth_token)
- )
+ str(self.auth_token))
+
+ self.assertEqual(200, int(resp['status']))
+
+ def test_get_users_tenant_group_unauthorized(self):
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
- self.assertEqual(200, int(respG['status']))
-
-
- def test_get_users_tenant_group_unauthorized(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ resp, content = util.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_tenant_group(self.tenant, self.tenant_group,
- self.user, self.auth_token)
+ resp, content = util.add_user_tenant_group(self.tenant,
+ self.tenant_group,
+ self.user,
+ self.auth_token)
- respG, contentG = get_user_tenant_group(self.tenant, self.tenant_group,
- str(self.token)
- )
+ resp, content = util.get_user_tenant_group(self.tenant,
+ self.tenant_group,
+ str(self.token))
+ self.assertEqual(401, int(resp['status']))
+ def test_get_users_tenant_group_unauthorized_xml(self):
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
- self.assertEqual(401, int(respG['status']))
- def test_get_users_tenant_group_unauthorized_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ resp, content = util.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_tenant_group(self.tenant, self.tenant_group,
- self.user, self.auth_token)
- respG, contentG = get_user_tenant_group_xml(self.tenant,
+ resp, content = util.add_user_tenant_group(self.tenant,
+ self.tenant_group,
+ self.user, self.auth_token)
+ resp, content = util.get_user_tenant_group_xml(self.tenant,
self.tenant_group,
- str(self.token)
- )
+ str(self.token))
+ self.assertEqual(401, int(resp['status']))
+ def test_get_users_tenant_group_forbidden(self):
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
- self.assertEqual(401, int(respG['status']))
-
- def test_get_users_tenant_group_forbidden(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ resp, content = util.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_tenant_group(self.tenant, self.tenant_group,
- self.user, self.auth_token)
- respG, contentG = get_user_tenant_group(self.tenant,
+ resp, content = util.add_user_tenant_group(self.tenant,
+ self.tenant_group,
+ self.user, self.auth_token)
+ resp, content = util.get_user_tenant_group(self.tenant,
self.tenant_group,
- str(self.disabled_token)
- )
+ str(self.disabled_token))
+
+ self.assertEqual(403, int(resp['status']))
+ def test_get_users_tenant_group_forbidden_xml(self):
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
- self.assertEqual(403, int(respG['status']))
-
- def test_get_users_tenant_group_forbidden_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ resp, content = util.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_tenant_group(self.tenant, self.tenant_group,
- self.user, self.auth_token)
- respG, contentG = get_user_tenant_group_xml(self.tenant,
+ resp, content = util.add_user_tenant_group(self.tenant,
+ self.tenant_group,
+ self.user, self.auth_token)
+ resp, content = util.get_user_tenant_group_xml(self.tenant,
self.tenant_group,
- str(self.disabled_token)
- )
+ str(self.disabled_token))
+
+ self.assertEqual(403, int(resp['status']))
+
+ def test_get_users_tenant_group_expired(self):
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
- self.assertEqual(403, int(respG['status']))
-
- def test_get_users_tenant_group_expired(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ resp, content = util.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_tenant_group(self.tenant, self.tenant_group,
- self.user, self.auth_token)
- respG, contentG = get_user_tenant_group(self.tenant, self.tenant_group,
- str(self.exp_auth_token)
- )
+ resp, content = util.add_user_tenant_group(self.tenant,
+ self.tenant_group,
+ self.user, self.auth_token)
+ resp, content = util.get_user_tenant_group(self.tenant,
+ self.tenant_group,
+ str(self.exp_auth_token))
+ self.assertEqual(403, int(resp['status']))
+ def test_get_users_tenant_group_expired_xml(self):
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
- self.assertEqual(403, int(respG['status']))
-
- def test_get_users_tenant_group_expired_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ resp, content = util.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_tenant_group(self.tenant, self.tenant_group,
+ resp, content = util.add_user_tenant_group(self.tenant,
+ self.tenant_group,
self.user, self.auth_token)
- respG, contentG = get_user_tenant_group_xml(self.tenant,
+ resp, content = util.get_user_tenant_group_xml(self.tenant,
self.tenant_group,
- str(self.exp_auth_token)
- )
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(respG['status']))
+ str(self.exp_auth_token))
+
+ self.assertEqual(403, int(resp['status']))
+
class delete_users_tenant_group_test(tenant_group_test):
def test_delete_user_tenant_group(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ resp, content = util.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_tenant_group(self.tenant, self.tenant_group,
- self.user, str(self.auth_token)
- )
- respG, contentG = delete_user_tenant_group(self.tenant,
+ resp, content = util.add_user_tenant_group(self.tenant,
+ self.tenant_group,
+ self.user,
+ str(self.auth_token))
+ resp, content = util.delete_user_tenant_group(self.tenant,
self.tenant_group,
self.user,
- str(self.auth_token)
- )
+ str(self.auth_token))
+ self.assertEqual(204, int(resp['status']))
+
+ def test_delete_user_tenant_group_xml(self):
+ resp, content = util.create_tenant(self.tenant, str(self.auth_token))
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
- self.assertEqual(204, int(respG['status']))
-
-
- def test_delete_user_tenant_group_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,
+ resp, content = util.create_tenant_group(self.tenant_group,
self.tenant,
str(self.auth_token))
- respG, contentG = create_user(self.tenant, self.user,
+ resp, content = util.create_user(self.tenant, self.user,
str(self.auth_token))
- respG, contentG = add_user_tenant_group_xml(self.tenant,
+ resp, content = util.add_user_tenant_group_xml(self.tenant,
self.tenant_group,
self.user,
str(self.auth_token))
- respG, contentG = delete_user_tenant_group_xml(self.tenant,
+ resp, content = util.delete_user_tenant_group_xml(self.tenant,
self.tenant_group,
self.user,
str(self.auth_token))
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(204, int(respG['status']))
+ self.assertEqual(204, int(resp['status']))
def test_delete_user_tenant_group_notfound(self):
- h = httplib2.Http(".cache")
-
- respG, contentG = delete_user_tenant_group(self.tenant,
+ resp, content = util.delete_user_tenant_group(self.tenant,
self.tenant_group,
'NonExistinguser',
- str(self.auth_token)
- )
-
- if int(respG['status']) == 500:
+ str(self.auth_token))
+ if int(resp['status']) == 500:
self.fail('IDM fault')
- elif int(respG['status']) == 503:
+ elif int(resp['status']) == 503:
self.fail('Service Not Available')
- self.assertEqual(404, int(respG['status']))
+ self.assertEqual(404, int(resp['status']))
def test_delete_user_tenant_group_notfound_xml(self):
- h = httplib2.Http(".cache")
-
- respG, contentG = delete_user_tenant_group_xml(self.tenant,
+ resp, content = util.delete_user_tenant_group_xml(self.tenant,
self.tenant_group,
'NonExistinguser',
- str(self.auth_token)
- )
-
- if int(respG['status']) == 500:
+ str(self.auth_token))
+ if int(resp['status']) == 500:
self.fail('IDM fault')
- elif int(respG['status']) == 503:
+ elif int(resp['status']) == 503:
self.fail('Service Not Available')
- self.assertEqual(404, int(respG['status']))
+ self.assertEqual(404, int(resp['status']))
if __name__ == '__main__':
unittest.main()
diff --git a/test/unit/test_tenants.py b/test/unit/test_tenants.py
index a4713c52..0489c195 100644
--- a/test/unit/test_tenants.py
+++ b/test/unit/test_tenants.py
@@ -4,36 +4,32 @@ import sys
sys.path.append(os.path.abspath(os.path.join(os.path.abspath(__file__),
'..', '..', '..', '..', 'keystone')))
import unittest
-from webtest import TestApp
import httplib2
import json
from lxml import etree
-import unittest
-from webtest import TestApp
-from test_common import *
+import test_common as utils
+
class tenant_test(unittest.TestCase):
def setUp(self):
- self.token = get_token('joeuser', 'secrete', 'token')
- self.tenant = get_tenant()
- self.user = get_user()
- self.userdisabled = get_userdisabled()
- self.auth_token = get_auth_token()
- self.exp_auth_token = get_exp_auth_token()
- self.disabled_token = get_disabled_token()
+ self.token = utils.get_token('joeuser', 'secrete', 'token')
+ self.tenant = 'test_tenant'
+ self.user = utils.get_user()
+ self.userdisabled = utils.get_userdisabled()
+ self.auth_token = utils.get_auth_token()
+ self.exp_auth_token = utils.get_exp_auth_token()
+ self.disabled_token = utils.get_disabled_token()
def tearDown(self):
- resp, content = delete_tenant(self.tenant, self.auth_token)
-
+ resp, content = utils.delete_tenant(self.tenant, self.auth_token)
+
class create_tenant_test(tenant_test):
-
- def test_tenant_create(self):
- resp, content = delete_tenant('test_tenant', str(self.auth_token))
- resp, content = create_tenant('test_tenant', str(self.auth_token))
- self.tenant = 'test_tenant'
+ def test_tenant_create(self):
+ resp, content = utils.delete_tenant(self.tenant, str(self.auth_token))
+ resp, content = utils.create_tenant(self.tenant, str(self.auth_token))
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
@@ -44,60 +40,56 @@ class create_tenant_test(tenant_test):
self.fail('Failed due to %d' % int(resp['status']))
def test_tenant_create_xml(self):
- resp, content = delete_tenant_xml('test_tenant', str(self.auth_token))
- resp, content = create_tenant_xml('test_tenant', str(self.auth_token))
- self.tenant = 'test_tenant'
+ resp, content = utils.delete_tenant_xml(self.tenant,
+ str(self.auth_token))
+ resp, content = utils.create_tenant_xml(self.tenant,
+ str(self.auth_token))
content = etree.fromstring(content)
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
-
if int(resp['status']) not in (200, 201):
-
self.fail('Failed due to %d' % int(resp['status']))
def test_tenant_create_again(self):
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- resp, content = create_tenant("test_tenant", str(self.auth_token))
+ resp, content = utils.create_tenant(self.tenant,
+ str(self.auth_token))
if int(resp['status']) == 200:
self.tenant = content['tenant']['id']
+ resp, content = utils.create_tenant(self.tenant,
+ str(self.auth_token))
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(409, int(resp['status']))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
def test_tenant_create_again_xml(self):
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ resp, content = utils.create_tenant_xml(self.tenant,
+ str(self.auth_token))
+ resp, content = utils.create_tenant_xml(self.tenant,
+ str(self.auth_token))
content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get("id")
-
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(409, int(resp['status']))
- if int(resp['status']) == 200:
- self.tenant = content.get("id")
def test_tenant_create_forbidden_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
+ header = httplib2.Http(".cache")
+ resp, content = utils.create_tenant(self.tenant, str(self.auth_token))
if int(resp['status']) == 200:
self.tenant = content['tenant']['id']
- url = '%stenants' % (URL)
+ url = '%stenants' % (utils.URL)
body = {"tenant": {"id": self.tenant,
"description": "A description ...",
"enabled": True}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
+ resp, content = header.request(url, "POST", body=json.dumps(body),
headers={"Content-Type": "application/json",
"X-Auth-Token": self.token})
@@ -108,19 +100,20 @@ class create_tenant_test(tenant_test):
self.assertEqual(401, int(resp['status']))
def test_tenant_create_forbidden_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ header = httplib2.Http(".cache")
+ resp, content = utils.create_tenant_xml(self.tenant,
+ str(self.auth_token))
content = etree.fromstring(content)
if int(resp['status']) == 200:
self.tenant = content.get('id')
- url = '%stenants' % (URL)
+ url = '%stenants' % (utils.URL)
body = '<?xml version="1.0" encoding="UTF-8"?> \
<tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
enabled="true" id="%s"> \
<description>A description...</description> \
</tenant>' % self.tenant
- resp, content = h.request(url, "POST", body=body,
+ resp, content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.token,
"ACCEPT": "application/xml"})
@@ -132,19 +125,18 @@ class create_tenant_test(tenant_test):
self.assertEqual(401, int(resp['status']))
def test_tenant_create_expired_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
+ header = httplib2.Http(".cache")
+ resp, content = utils.create_tenant(self.tenant,
+ str(self.auth_token))
if int(resp['status']) == 200:
self.tenant = content['tenant']['id']
-
- url = '%stenants' % (URL)
+ url = '%stenants' % (utils.URL)
body = {"tenant": {"id": self.tenant,
"description": "A description ...",
"enabled": True}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
+ resp, content = header.request(url, "POST", body=json.dumps(body),
headers={"Content-Type": "application/json",
"X-Auth-Token": self.exp_auth_token})
-
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
@@ -152,20 +144,21 @@ class create_tenant_test(tenant_test):
self.assertEqual(403, int(resp['status']))
def test_tenant_create_expired_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ header = httplib2.Http(".cache")
+ resp, content = utils.create_tenant_xml(self.tenant,
+ str(self.auth_token))
content = etree.fromstring(content)
if int(resp['status']) == 200:
self.tenant = content.get('id')
- url = '%stenants' % (URL)
+ url = '%stenants' % (utils.URL)
body = '<?xml version="1.0" encoding="UTF-8"?> \
<tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
enabled="true" id="%s"> \
<description>A description...</description> \
</tenant>' % self.tenant
- resp, content = h.request(url, "POST", body=body,
+ resp, content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.exp_auth_token,
"ACCEPT": "application/xml"})
@@ -177,16 +170,17 @@ class create_tenant_test(tenant_test):
self.assertEqual(403, int(resp['status']))
def test_tenant_create_missing_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
+ header = httplib2.Http(".cache")
+ resp, content = utils.create_tenant(self.tenant,
+ str(self.auth_token))
if int(resp['status']) == 200:
self.tenant = content['tenant']['id']
- url = '%stenants' % (URL)
+ url = '%stenants' % (utils.URL)
body = {"tenant": {"id": self.tenant,
"description": "A description ...",
"enabled": True}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
+ resp, content = header.request(url, "POST", body=json.dumps(body),
headers={"Content-Type": "application/json"})
if int(resp['status']) == 500:
@@ -196,23 +190,21 @@ class create_tenant_test(tenant_test):
self.assertEqual(401, int(resp['status']))
def test_tenant_create_missing_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ header = httplib2.Http(".cache")
+ resp, content = utils.create_tenant_xml(self.tenant,
+ str(self.auth_token))
content = etree.fromstring(content)
if int(resp['status']) == 200:
self.tenant = content.get('id')
-
- url = '%stenants' % (URL)
-
+ url = '%stenants' % (utils.URL)
body = '<?xml version="1.0" encoding="UTF-8"?> \
<tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
enabled="true" id="%s"> \
<description>A description...</description> \
</tenant>' % self.tenant
- resp, content = h.request(url, "POST", body=body,
+ resp, content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/xml",
"ACCEPT": "application/xml"})
-
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
@@ -220,19 +212,19 @@ class create_tenant_test(tenant_test):
self.assertEqual(401, int(resp['status']))
def test_tenant_create_disabled_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
+ header = httplib2.Http(".cache")
+ resp, content = utils.create_tenant(self.tenant,
+ str(self.auth_token))
if int(resp['status']) == 200:
self.tenant = content['tenant']['id']
- url = '%stenants' % (URL)
+ url = '%stenants' % (utils.URL)
body = '{"tenant": { "id": "%s", \
"description": "A description ...", "enabled"\
:true } }' % self.tenant
- resp, content = h.request(url, "POST", body=body,
+ resp, content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/json",
- "X-Auth-Token": self.disabled_token
- })
+ "X-Auth-Token": self.disabled_token})
if int(resp['status']) == 500:
self.fail('IDM fault')
@@ -241,23 +233,22 @@ class create_tenant_test(tenant_test):
self.assertEqual(403, int(resp['status']))
def test_tenant_create_disabled_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ header = httplib2.Http(".cache")
+ resp, content = utils.create_tenant_xml(self.tenant,
+ str(self.auth_token))
content = etree.fromstring(content)
if int(resp['status']) == 200:
self.tenant = content.get('id')
-
- url = '%stenants' % (URL)
+ url = '%stenants' % (utils.URL)
body = '<?xml version="1.0" encoding="UTF-8"?> \
<tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
enabled="true" id="%s"> \
<description>A description...</description> \
</tenant>' % self.tenant
- resp, content = h.request(url, "POST", body=body,
+ resp, content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.disabled_token,
"ACCEPT": "application/xml"})
-
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
@@ -265,16 +256,17 @@ class create_tenant_test(tenant_test):
self.assertEqual(403, int(resp['status']))
def test_tenant_create_invalid_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
+ header = httplib2.Http(".cache")
+ resp, content = utils.create_tenant(self.tenant,
+ str(self.auth_token))
if int(resp['status']) == 200:
self.tenant = content['tenant']['id']
- url = '%stenants' % (URL)
+ url = '%stenants' % (utils.URL)
body = '{"tenant": { "id": "%s", \
"description": "A description ...", "enabled"\
:true } }' % self.tenant
- resp, content = h.request(url, "POST", body=body,
+ resp, content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/json",
"X-Auth-Token": 'nonexsitingtoken'})
@@ -285,19 +277,20 @@ class create_tenant_test(tenant_test):
self.assertEqual(404, int(resp['status']))
def test_tenant_create_invalid_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ header = httplib2.Http(".cache")
+ resp, content = utils.create_tenant_xml(self.tenant,
+ str(self.auth_token))
content = etree.fromstring(content)
if int(resp['status']) == 200:
self.tenant = content.get('id')
- url = '%stenants' % (URL)
+ url = '%stenants' % (utils.URL)
body = '<?xml version="1.0" encoding="UTF-8"?> \
<tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
enabled="true" id="%s"> \
<description>A description...</description> \
</tenant>' % self.tenant
- resp, content = h.request(url, "POST", body=body,
+ resp, content = header.request(url, "POST", body=body,
headers={"Content-Type": "application/xml",
"X-Auth-Token": 'nonexsitingtoken',
"ACCEPT": "application/xml"})
@@ -312,14 +305,13 @@ class create_tenant_test(tenant_test):
class get_tenants_test(tenant_test):
def test_get_tenants(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants' % (URL)
+ header = httplib2.Http(".cache")
+ resp, content = utils.create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants' % (utils.URL)
#test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',
+ resp, content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
- "X-Auth-Token": self.auth_token
- })
+ "X-Auth-Token": self.auth_token})
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
@@ -327,11 +319,11 @@ class get_tenants_test(tenant_test):
self.assertEqual(200, int(resp['status']))
def test_get_tenants_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants' % (URL)
+ header = httplib2.Http(".cache")
+ resp, content = utils.create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants' % (utils.URL)
#test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',
+ resp, content = header.request(url, "GET", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.auth_token,
"ACCEPT": "application/xml"})
@@ -342,11 +334,11 @@ class get_tenants_test(tenant_test):
self.assertEqual(200, int(resp['status']))
def test_get_tenants_unauthorized_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants' % (URL)
+ header = httplib2.Http(".cache")
+ resp, content = utils.create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants' % (utils.URL)
#test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',
+ resp, content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.token})
if int(resp['status']) == 500:
@@ -356,11 +348,11 @@ class get_tenants_test(tenant_test):
self.assertEqual(401, int(resp['status']))
def test_get_tenants_unauthorized_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants' % (URL)
+ header = httplib2.Http(".cache")
+ resp, content = utils.create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants' % (utils.URL)
#test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',
+ resp, content = header.request(url, "GET", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.token,
"ACCEPT": "application/xml"})
@@ -371,14 +363,13 @@ class get_tenants_test(tenant_test):
self.assertEqual(401, int(resp['status']))
def test_get_tenants_exp_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants' % (URL)
+ header = httplib2.Http(".cache")
+ resp, content = utils.create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants' % (utils.URL)
#test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',
+ resp, content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
- "X-Auth-Token": self.exp_auth_token
- })
+ "X-Auth-Token": self.exp_auth_token})
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
@@ -386,11 +377,11 @@ class get_tenants_test(tenant_test):
self.assertEqual(403, int(resp['status']))
def test_get_tenants_exp_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants' % (URL)
+ header = httplib2.Http(".cache")
+ resp, content = utils.create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants' % (utils.URL)
#test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',
+ resp, content = header.request(url, "GET", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.exp_auth_token,
"ACCEPT": "application/xml"})
@@ -404,11 +395,11 @@ class get_tenants_test(tenant_test):
class get_tenant_test(tenant_test):
def test_get_tenant(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, self.tenant)
+ header = httplib2.Http(".cache")
+ resp, content = utils.create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/%s' % (utils.URL, self.tenant)
#test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',
+ resp, content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.auth_token})
if int(resp['status']) == 500:
@@ -418,11 +409,11 @@ class get_tenant_test(tenant_test):
self.assertEqual(200, int(resp['status']))
def test_get_tenant_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, self.tenant)
+ header = httplib2.Http(".cache")
+ resp, content = utils.create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/%s' % (utils.URL, self.tenant)
#test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',
+ resp, content = header.request(url, "GET", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.auth_token,
"ACCEPT": "application/xml"})
@@ -433,11 +424,11 @@ class get_tenant_test(tenant_test):
self.assertEqual(200, int(resp['status']))
def test_get_tenant_bad(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, 'tenant_bad')
+ header = httplib2.Http(".cache")
+ resp, content = utils.create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/%s' % (utils.URL, 'tenant_bad')
#test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{',
+ resp, content = header.request(url, "GET", body='',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.auth_token})
if int(resp['status']) == 500:
@@ -447,11 +438,11 @@ class get_tenant_test(tenant_test):
self.assertEqual(404, int(resp['status']))
def test_get_tenant_bad_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, 'tenant_bad')
+ header = httplib2.Http(".cache")
+ resp, content = utils.create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/%s' % (utils.URL, 'tenant_bad')
#test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{',
+ resp, content = header.request(url, "GET", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.auth_token,
"ACCEPT": "application/xml"})
@@ -462,11 +453,11 @@ class get_tenant_test(tenant_test):
self.assertEqual(404, int(resp['status']))
def test_get_tenant_not_found(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/NonexistingID' % (URL)
+ header = httplib2.Http(".cache")
+ resp, content = utils.create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/NonexistingID' % (utils.URL)
#test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',
+ resp, content = header.request(url, "GET", body='{}',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.auth_token})
if int(resp['status']) == 500:
@@ -476,11 +467,11 @@ class get_tenant_test(tenant_test):
self.assertEqual(404, int(resp['status']))
def test_get_tenant_not_found_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/NonexistingID' % (URL)
+ header = httplib2.Http(".cache")
+ resp, content = utils.create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/NonexistingID' % (utils.URL)
#test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',
+ resp, content = header.request(url, "GET", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.auth_token,
"ACCEPT": "application/xml"})
@@ -494,13 +485,13 @@ class get_tenant_test(tenant_test):
class update_tenant_test(tenant_test):
def test_update_tenant(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, self.tenant)
+ header = httplib2.Http(".cache")
+ resp, content = utils.create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/%s' % (utils.URL, self.tenant)
data = '{"tenant": { "description": "A NEW description..." ,\
"enabled":true }}'
#test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,
+ resp, content = header.request(url, "PUT", body=data,
headers={"Content-Type": "application/json",
"X-Auth-Token": self.auth_token})
body = json.loads(content)
@@ -509,13 +500,14 @@ class update_tenant_test(tenant_test):
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
- self.assertEqual(int(self.tenant), int(body['tenant']['id']))
+ self.assertEqual(self.tenant, body['tenant']['id'])
self.assertEqual('A NEW description...', body['tenant']['description'])
def test_update_tenant_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, self.tenant)
+ header = httplib2.Http(".cache")
+ resp, content = utils.create_tenant_xml(self.tenant,
+ str(self.auth_token))
+ url = '%stenants/%s' % (utils.URL, self.tenant)
data = '<?xml version="1.0" encoding="UTF-8"?> \
<tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
enabled="true"> \
@@ -523,7 +515,7 @@ class update_tenant_test(tenant_test):
</tenant>'
#test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,
+ resp, content = header.request(url, "PUT", body=data,
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.auth_token,
"ACCEPT": "application/xml"})
@@ -534,18 +526,18 @@ class update_tenant_test(tenant_test):
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
- self.assertEqual(int(self.tenant), int(body.get('id')))
+ self.assertEqual(self.tenant, body.get('id'))
self.assertEqual('A NEW description...', desc.text)
def test_update_tenant_bad(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, self.tenant)
+ header = httplib2.Http(".cache")
+ resp, content = utils.create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/%s' % (utils.URL, self.tenant)
data = '{"tenant": { "description_bad": "A NEW description...",\
"enabled":true }}'
#test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,
+ resp, content = header.request(url, "PUT", body=data,
headers={"Content-Type": "application/json",
"X-Auth-Token": self.auth_token})
if int(resp['status']) == 500:
@@ -555,16 +547,16 @@ class update_tenant_test(tenant_test):
self.assertEqual(400, int(resp['status']))
def test_update_tenant_bad_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, self.tenant)
+ header = httplib2.Http(".cache")
+ resp, content = utils.create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/%s' % (utils.URL, self.tenant)
data = '<?xml version="1.0" encoding="UTF-8"?> \
<tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
enabled="true"> \
<description_bad>A NEW description...</description> \
</tenant>'
#test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,
+ resp, content = header.request(url, "PUT", body=data,
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.auth_token,
"ACCEPT": "application/xml"})
@@ -575,13 +567,13 @@ class update_tenant_test(tenant_test):
self.assertEqual(400, int(resp['status']))
def test_update_tenant_not_found(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/NonexistingID' % (URL)
+ header = httplib2.Http(".cache")
+ resp, content = utils.create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/NonexistingID' % (utils.URL)
data = '{"tenant": { "description": "A NEW description...",\
"enabled":true }}'
#test for Content-Type = application/json
- resp, content = h.request(url, "GET", body=data,
+ resp, content = header.request(url, "GET", body=data,
headers={"Content-Type": "application/json",
"X-Auth-Token": self.auth_token})
if int(resp['status']) == 500:
@@ -591,16 +583,16 @@ class update_tenant_test(tenant_test):
self.assertEqual(404, int(resp['status']))
def test_update_tenant_not_found_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/NonexistingID' % (URL)
+ header = httplib2.Http(".cache")
+ resp, content = utils.create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/NonexistingID' % (utils.URL)
data = '<?xml version="1.0" encoding="UTF-8"?> \
<tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
enabled="true"> \
<description_bad>A NEW description...</description> \
</tenant>'
#test for Content-Type = application/json
- resp, content = h.request(url, "GET", body=data,
+ resp, content = header.request(url, "GET", body=data,
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.auth_token,
"ACCEPT": "application/xml"})
@@ -614,30 +606,32 @@ class update_tenant_test(tenant_test):
class delete_tenant_test(tenant_test):
def test_delete_tenant_not_found(self):
- #resp,content=create_tenant("test_tenant_delete", str(self.auth_token))
- resp, content = delete_tenant("test_tenant_delete111",
+ #resp,content=utils.create_tenant("test_tenant_delete",
+ # str(self.auth_token))
+ resp, content = utils.delete_tenant("test_tenant_delete111",
str(self.auth_token))
self.assertEqual(404, int(resp['status']))
def test_delete_tenant_not_found_xml(self):
- #resp,content=create_tenant("test_tenant_delete", str(self.auth_token))
- resp, content = delete_tenant_xml("test_tenant_delete111",
+ #resp,content=utils.create_tenant("test_tenant_delete",
+ # str(self.auth_token))
+ resp, content = utils.delete_tenant_xml("test_tenant_delete111",
str(self.auth_token))
self.assertEqual(404, int(resp['status']))
def test_delete_tenant(self):
- resp, content = create_tenant("test_tenant_delete",
+ resp, content = utils.create_tenant("test_tenant_delete",
str(self.auth_token))
- resp, content = delete_tenant("test_tenant_delete",
+ resp, content = utils.delete_tenant("test_tenant_delete",
str(self.auth_token))
self.assertEqual(204, int(resp['status']))
def test_delete_tenant_xml(self):
- resp, content = create_tenant_xml("test_tenant_delete",
+ resp, content = utils.create_tenant_xml("test_tenant_delete",
str(self.auth_token))
- resp, content = delete_tenant_xml("test_tenant_delete",
+ resp, content = utils.delete_tenant_xml("test_tenant_delete",
str(self.auth_token))
self.assertEqual(204, int(resp['status']))
if __name__ == '__main__':
- unittest.main() \ No newline at end of file
+ unittest.main()
diff --git a/test/unit/test_token.py b/test/unit/test_token.py
index c29061c2..ce33c8f1 100644
--- a/test/unit/test_token.py
+++ b/test/unit/test_token.py
@@ -4,34 +4,29 @@ import sys
sys.path.append(os.path.abspath(os.path.join(os.path.abspath(__file__),
'..', '..', '..', '..', 'keystone')))
import unittest
-from webtest import TestApp
import httplib2
-import json
-from lxml import etree
-import unittest
-from webtest import TestApp
-from test_common import *
+import test_common as utils
class validate_token(unittest.TestCase):
def setUp(self):
- self.token = get_token('joeuser', 'secrete', 'token')
- self.tenant = get_tenant()
- self.user = get_user()
- self.userdisabled = get_userdisabled()
- self.auth_token = get_auth_token()
- self.exp_auth_token = get_exp_auth_token()
- self.disabled_token = get_disabled_token()
+ self.token = utils.get_token('joeuser', 'secrete', 'token')
+ self.tenant = utils.get_tenant()
+ self.user = utils.get_user()
+ self.userdisabled = utils.get_userdisabled()
+ self.auth_token = utils.get_auth_token()
+ self.exp_auth_token = utils.get_exp_auth_token()
+ self.disabled_token = utils.get_disabled_token()
def tearDown(self):
- delete_token(self.token, self.auth_token)
+ utils.delete_token(self.token, self.auth_token)
def test_validate_token_true(self):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
- url = '%stoken/%s?belongsTo=%s' % (URL, self.token, self.tenant)
- resp, content = h.request(url, "GET", body='',
+ url = '%stoken/%s?belongsTo=%s' % (utils.URL, self.token, self.tenant)
+ resp, content = header.request(url, "GET", body='',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.auth_token})
if int(resp['status']) == 500:
@@ -39,12 +34,12 @@ class validate_token(unittest.TestCase):
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
- self.assertEqual('application/json', content_type(resp))
+ self.assertEqual('application/json', utils.content_type(resp))
def test_validate_token_true_xml(self):
- h = httplib2.Http(".cache")
- url = '%stoken/%s?belongsTo=%s' % (URL, self.token, self.tenant)
- resp, content = h.request(url, "GET", body='',
+ header = httplib2.Http(".cache")
+ url = '%stoken/%s?belongsTo=%s' % (utils.URL, self.token, self.tenant)
+ resp, content = header.request(url, "GET", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.auth_token,
"ACCEPT": "application/xml"})
@@ -53,29 +48,28 @@ class validate_token(unittest.TestCase):
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(200, int(resp['status']))
- self.assertEqual('application/xml', content_type(resp))
+ self.assertEqual('application/xml', utils.content_type(resp))
def test_validate_token_expired(self):
- h = httplib2.Http(".cache")
- url = '%stoken/%s?belongsTo=%s' % (URL, self.exp_auth_token,
+ header = httplib2.Http(".cache")
+ url = '%stoken/%s?belongsTo=%s' % (utils.URL, self.exp_auth_token,
self.tenant)
- resp, content = h.request(url, "GET", body='',
+ resp, content = header.request(url, "GET", body='',
headers={"Content-Type": "application/json",
- "X-Auth-Token": self.exp_auth_token}
- )
+ "X-Auth-Token": self.exp_auth_token})
if int(resp['status']) == 500:
self.fail('IDM fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp['status']))
- self.assertEqual('application/json', content_type(resp))
+ self.assertEqual('application/json', utils.content_type(resp))
def test_validate_token_expired_xml(self):
- h = httplib2.Http(".cache")
+ header = httplib2.Http(".cache")
- url = '%stoken/%s?belongsTo=%s' % (URL, self.exp_auth_token,
+ url = '%stoken/%s?belongsTo=%s' % (utils.URL, self.exp_auth_token,
self.tenant)
- resp, content = h.request(url, "GET", body='',
+ resp, content = header.request(url, "GET", body='',
headers={"Content-Type": "application/xml",
"X-Auth-Token": self.exp_auth_token,
"ACCEPT": "application/xml"})
@@ -84,13 +78,13 @@ class validate_token(unittest.TestCase):
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(403, int(resp['status']))
- self.assertEqual('application/xml', content_type(resp))
+ self.assertEqual('application/xml', utils.content_type(resp))
def test_validate_token_invalid(self):
- h = httplib2.Http(".cache")
- url = '%stoken/%s?belongsTo=%s' % (URL, 'NonExistingToken',
+ header = httplib2.Http(".cache")
+ url = '%stoken/%s?belongsTo=%s' % (utils.URL, 'NonExistingToken',
self.tenant)
- resp, content = h.request(url, "GET", body='',
+ resp, content = header.request(url, "GET", body='',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.auth_token})
@@ -99,13 +93,13 @@ class validate_token(unittest.TestCase):
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(401, int(resp['status']))
- self.assertEqual('application/json', content_type(resp))
+ self.assertEqual('application/json', utils.content_type(resp))
def test_validate_token_invalid_xml(self):
- h = httplib2.Http(".cache")
- url = '%stoken/%s?belongsTo=%s' % (URL, 'NonExistingToken',
+ header = httplib2.Http(".cache")
+ url = '%stoken/%s?belongsTo=%s' % (utils.URL, 'NonExistingToken',
self.tenant)
- resp, content = h.request(url, "GET", body='',
+ resp, content = header.request(url, "GET", body='',
headers={"Content-Type": "application/json",
"X-Auth-Token": self.auth_token})
if int(resp['status']) == 500:
@@ -113,7 +107,7 @@ class validate_token(unittest.TestCase):
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(401, int(resp['status']))
- self.assertEqual('application/json', content_type(resp))
+ self.assertEqual('application/json', utils.content_type(resp))
if __name__ == '__main__':
unittest.main()
diff --git a/test/unit/test_version.py b/test/unit/test_version.py
index b5e536a5..76b013ab 100644
--- a/test/unit/test_version.py
+++ b/test/unit/test_version.py
@@ -4,13 +4,9 @@ import sys
sys.path.append(os.path.abspath(os.path.join(os.path.abspath(__file__),
'..', '..', '..', '..', 'keystone')))
import unittest
-from webtest import TestApp
import httplib2
-import json
-from lxml import etree
-import unittest
-from webtest import TestApp
-from test_common import *
+import test_common as utils
+
class version_test(unittest.TestCase):
@@ -18,22 +14,19 @@ class version_test(unittest.TestCase):
#here to call below method will call as last test case
def test_a_get_version_json(self):
- h = httplib2.Http(".cache")
- url = URL
- resp, content = h.request(url, "GET", body="",
- headers={"Content-Type":"application/json"})
+ header = httplib2.Http(".cache")
+ resp, content = header.request(utils.URL, "GET", body="",
+ headers={"Content-Type": "application/json"})
self.assertEqual(200, int(resp['status']))
- self.assertEqual('application/json', content_type(resp))
+ self.assertEqual('application/json', utils.content_type(resp))
def test_a_get_version_xml(self):
- h = httplib2.Http(".cache")
- url = URL
- resp, content = h.request(url, "GET", body="",
+ header = httplib2.Http(".cache")
+ resp, content = header.request(utils.URL, "GET", body="",
headers={"Content-Type": "application/xml",
"ACCEPT": "application/xml"})
-
self.assertEqual(200, int(resp['status']))
- self.assertEqual('application/xml', content_type(resp))
-
+ self.assertEqual('application/xml', utils.content_type(resp))
+
if __name__ == '__main__':
unittest.main()