diff options
| author | Sai Krishna <saikrishna1511@gmail.com> | 2011-05-05 19:10:18 +0530 |
|---|---|---|
| committer | Sai Krishna <saikrishna1511@gmail.com> | 2011-05-05 19:10:18 +0530 |
| commit | 8716d49c66fe50ab2e64ed97079c0a47bb8ea559 (patch) | |
| tree | 57089569b7d5c521a6dd469e9545d3345b773af2 | |
| parent | 9516c4e0f446244cba4c3f5a9eba0abca45a822f (diff) | |
| parent | a0452fe1a376550bddd18987bd6d0d902eb649b4 (diff) | |
| download | keystone-8716d49c66fe50ab2e64ed97079c0a47bb8ea559.tar.gz keystone-8716d49c66fe50ab2e64ed97079c0a47bb8ea559.tar.xz keystone-8716d49c66fe50ab2e64ed97079c0a47bb8ea559.zip | |
Merge branch 'master' of https://github.com/khussein/keystone
Conflicts:
test/unit/test_identity.py
| -rw-r--r-- | HACKING | 29 | ||||
| -rw-r--r-- | README.md | 30 | ||||
| -rwxr-xr-x | bin/keystoned | 26 | ||||
| -rw-r--r-- | docs/guide/src/docbkx/xsd/atom/atom.xsd | 10 | ||||
| -rw-r--r-- | keystone/auth_protocols/auth_basic.py | 2 | ||||
| -rw-r--r-- | keystone/auth_protocols/auth_openid.py | 2 | ||||
| -rw-r--r-- | keystone/auth_protocols/auth_token.py | 10 | ||||
| -rw-r--r-- | keystone/db/sqlalchemy/api.py | 32 | ||||
| -rw-r--r-- | keystone/keystone.db | bin | 0 -> 15360 bytes | |||
| -rw-r--r-- | keystone/logic/service.py | 92 | ||||
| -rw-r--r-- | keystone/logic/types/fault.py | 4 | ||||
| -rw-r--r-- | keystone/logic/types/tenant.py | 80 | ||||
| -rw-r--r-- | keystone/server.py (renamed from keystone/identity.py) | 13 | ||||
| -rw-r--r-- | pip-requires | 1 | ||||
| -rw-r--r-- | test/unit/test_identity.py | 4212 |
15 files changed, 2295 insertions, 2248 deletions
@@ -1,5 +1,5 @@ -Nova Style Commandments -======================= +Keystone Style Commandments (pilfered from Nova and added to) +============================================================= Step 1: Read http://www.python.org/dev/peps/pep-0008/ Step 2: Read http://www.python.org/dev/peps/pep-0008/ again @@ -16,7 +16,7 @@ Imports # vim: tabstop=4 shiftwidth=4 softtabstop=4 {{stdlib imports in human alphabetical order}} \n - {{nova imports in human alphabetical order}} + {{OpenStack/Keystone imports in human alphabetical order}} \n \n {{begin your code}} @@ -27,8 +27,9 @@ General - thou shalt put two newlines twixt toplevel code (funcs, classes, etc) - thou shalt put one newline twixt methods in classes and anywhere else - thou shalt not write "except:", use "except Exception:" at the very least -- thou shalt include your name with TODOs as in "TODO(termie)" +- thou shalt include your name with TODOs as in "TODO(waldo)" - thou shalt not name anything the same name as a builtin or reserved word +- thou shouldeth comment profusely - thou shalt not violate causality in our time cone, or else @@ -42,14 +43,12 @@ Human Alphabetical Order Examples import time import unittest - from nova import flags - from nova import test - from nova.auth import users - from nova.endpoint import api - from nova.endpoint import cloud + import keystone.logic.types.fault as fault + import keystone.db.sqlalchemy.api as db_api Docstrings ---------- +Add them to modules, classes, and functions: """Summary of the function, class or method, less than 80 characters. New paragraph after newline that explains in more detail any general @@ -66,3 +65,15 @@ Docstrings :returns: description of the return value """ + +Done/Done Criteria +------------------ +How we define our code is done and ready for release: +1. PEP-8 compliance +2. pylint (same rules as Nova) +3. McCabe 10 or less +4. 65.258% test coverage +5. All functional and unit tests pass +6. Q/A Approval (if applicable - it is for Rackspace Integration dev teams) +7. No sev A bugs (this shoud have been #1) + @@ -41,27 +41,15 @@ SETUP: ------ Install http://pypi.python.org/pypi/setuptools - - sudo easy_install bottle - sudo easy_install eventlet - sudo easy_install lxml - sudo easy_install paste - sudo easy_install pastedeploy - sudo easy_install pastescript - sudo easy_install pysqlite - sudo easy_install sqlalchemy - sudo easy_install webob - -Or using pip: - + sudo easy_install pip sudo pip install -r pip-requires RUNNING KEYSTONE: ----------------- - $ cd keystone - $ python identity.py + $ cd bin + $ ./keystoned RUNNING TEST SERVICE: @@ -140,7 +128,7 @@ Unit Test on Identity Services ------------------------------ In order to run the unit test on identity services, run from the keystone directory - python identity.py + python server.py Once the Identity service is running, go to unit test/unit directory @@ -151,13 +139,3 @@ For more on unit testing please refer python test_identity --help - -DATABASE SCHEMA ---------------- - - CREATE TABLE groups(group_id varchar(255),group_desc varchar(255),tenant_id varchar(255),FOREIGN KEY(tenant_id) REFERENCES tenant(tenant_id)); - CREATE TABLE tenants(tenant_id varchar(255), tenant_desc varchar(255), tenant_enabled INTEGER, PRIMARY KEY(tenant_id ASC)); - CREATE TABLE token(token_id varchar(255),user_id varchar(255),expires datetime,tenant_id varchar(255)); - CREATE TABLE user_group(user_id varchar(255),group_id varchar(255), FOREIGN KEY(user_id) REFERENCES user(id), FOREIGN KEY(group_id) REFERENCES groups(group_id)); - CREATE TABLE user_tenant(tenant_id varchar(255),user_id varchar(255),FOREIGN KEY(tenant_id) REFERENCES tenant(tenant_id),FOREIGN KEY(user_id) REFERENCES user(id)); - CREATE TABLE users(id varchar(255),password varchar(255),email varchar(255),enabled integer); diff --git a/bin/keystoned b/bin/keystoned new file mode 100755 index 00000000..f336ca1d --- /dev/null +++ b/bin/keystoned @@ -0,0 +1,26 @@ +#!/bin/sh +# Copyright (C) 2011 OpenStack LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# If ../keystone/__init__.py exists, add ../ to the Python search path so +# that it will override whatever may be installed in the default Python +# search path. +script_dir=`dirname $0` +if [ -f "$script_dir/../keystone/__init__.py" ] +then + PYTHONPATH="$script_dir/..:$PYTHONPATH" + export PYTHONPATH +fi + +/usr/bin/env python -m keystone.server $* diff --git a/docs/guide/src/docbkx/xsd/atom/atom.xsd b/docs/guide/src/docbkx/xsd/atom/atom.xsd index a619efaa..c515c497 100644 --- a/docs/guide/src/docbkx/xsd/atom/atom.xsd +++ b/docs/guide/src/docbkx/xsd/atom/atom.xsd @@ -72,7 +72,7 @@ <xs:attribute name="rel" use="required" type="atom:relation"> <xs:annotation> <xs:documentation> - <html:p>TODO</html:p> + <html:p>TODO(Jorge)</html:p> </xs:documentation> </xs:annotation> </xs:attribute> @@ -80,7 +80,7 @@ <xs:attribute name="type" use="optional" type="xs:string"> <xs:annotation> <xs:documentation> - <html:p>TODO</html:p> + <html:p>TODO(Jorge)</html:p> </xs:documentation> </xs:annotation> </xs:attribute> @@ -88,7 +88,7 @@ <xs:attribute name="href" use="required" type="xs:anyURI"> <xs:annotation> <xs:documentation> - <html:p>TODO</html:p> + <html:p>TODO(Jorge)</html:p> </xs:documentation> </xs:annotation> </xs:attribute> @@ -96,7 +96,7 @@ <xs:attribute name="hreflang" use="optional" type="xs:NMTOKEN"> <xs:annotation> <xs:documentation> - <html:p>TODO</html:p> + <html:p>TODO(Jorge)</html:p> </xs:documentation> </xs:annotation> </xs:attribute> @@ -104,7 +104,7 @@ <xs:attribute name="title" use="optional" type="xs:string"> <xs:annotation> <xs:documentation> - <html:p>TODO</html:p> + <html:p>TODO(Jorge)</html:p> </xs:documentation> </xs:annotation> </xs:attribute> diff --git a/keystone/auth_protocols/auth_basic.py b/keystone/auth_protocols/auth_basic.py index 046ca08e..17f2261a 100644 --- a/keystone/auth_protocols/auth_basic.py +++ b/keystone/auth_protocols/auth_basic.py @@ -142,7 +142,7 @@ class AuthProtocol(object): ssl=(self.service_protocol == 'https')) resp = conn.getresponse() data = resp.read() - #TODO: use a more sophisticated proxy + #TODO(ziad): use a more sophisticated proxy # we are rewriting the headers now return Response(status=resp.status, body=data)(env, start_response) diff --git a/keystone/auth_protocols/auth_openid.py b/keystone/auth_protocols/auth_openid.py index ac9121f7..d68741df 100644 --- a/keystone/auth_protocols/auth_openid.py +++ b/keystone/auth_protocols/auth_openid.py @@ -85,7 +85,7 @@ class AuthProtocol(object): ssl=(self.service_protocol == 'https')) resp = conn.getresponse() data = resp.read() - #TODO: use a more sophisticated proxy + #TODO(ziad): use a more sophisticated proxy # we are rewriting the headers now return Response(status=resp.status, body=data)(env, start_response) diff --git a/keystone/auth_protocols/auth_token.py b/keystone/auth_protocols/auth_token.py index fbc6c622..4c5c5e9c 100644 --- a/keystone/auth_protocols/auth_token.py +++ b/keystone/auth_protocols/auth_token.py @@ -113,7 +113,7 @@ class AuthProtocol(object): def __init__(self, app, conf): """ Common initialization code """ - #TODO: maybe we rafactor this into a superclass + #TODO(ziad): maybe we rafactor this into a superclass self._init_protocol_common(app, conf) # Applies to all protocols self._init_protocol(app, conf) # Specific to this protocol @@ -209,7 +209,7 @@ class AuthProtocol(object): # Step 1: We need to auth with the keystone service, so get an # admin token - #TODO: Need to properly implement this, where to store creds + #TODO(ziad): Need to properly implement this, where to store creds # for now using token from ini #auth = self.get_admin_auth_token("admin", "secrete", "1") #admin_token = json.loads(auth)["auth"]["token"]["id"] @@ -220,7 +220,7 @@ class AuthProtocol(object): headers = {"Content-type": "application/json", "Accept": "text/json", "X-Auth-Token": self.admin_token} - ##TODO:we need to figure out how to auth to keystone + ##TODO(ziad):we need to figure out how to auth to keystone #since validate_token is a priviledged call #Khaled's version uses creds to get a token # "X-Auth-Token": admin_token} @@ -246,7 +246,7 @@ class AuthProtocol(object): headers = {"Content-type": "application/json", "Accept": "text/json", "X-Auth-Token": self.admin_token} - ##TODO:we need to figure out how to auth to keystone + ##TODO(ziad):we need to figure out how to auth to keystone #since validate_token is a priviledged call #Khaled's version uses creds to get a token # "X-Auth-Token": admin_token} @@ -294,7 +294,7 @@ class AuthProtocol(object): ssl=(self.service_protocol == 'https')) resp = conn.getresponse() data = resp.read() - #TODO: use a more sophisticated proxy + #TODO(ziad): use a more sophisticated proxy # we are rewriting the headers now return Response(status=resp.status, body=data)(self.proxy_headers, self.start_response) diff --git a/keystone/db/sqlalchemy/api.py b/keystone/db/sqlalchemy/api.py index 47768c17..7ca888f4 100644 --- a/keystone/db/sqlalchemy/api.py +++ b/keystone/db/sqlalchemy/api.py @@ -44,7 +44,7 @@ def tenant_get_all(session=None): def tenant_get_page(marker,limit,session=None): if not session: session = get_session() - + if marker: return session.query(models.Tenant).filter("id>:marker").params(\ marker = '%s' % marker).order_by\ @@ -53,8 +53,8 @@ def tenant_get_page(marker,limit,session=None): return session.query(models.Tenant).order_by(\ models.Tenant.id.desc()).limit(limit).all() #return session.query(models.Tenant).all() - - + + def tenant_get_page_markers(marker,limit,session=None): if not session: session = get_session() @@ -120,7 +120,7 @@ def tenant_group_is_empty( id, session=None): group_id=id).first() if a_user != None: return False - + return True def tenant_delete(id, session=None): @@ -142,13 +142,13 @@ def tenant_group_get(id, tenant, session=None): if not session: session = get_session() result = session.query(models.Group).filter_by(id=id, tenant_id=tenant).first() - + return result def tenant_group_get_page(tenantId, marker,limit,session=None): if not session: session = get_session() - + if marker: return session.query(models.Group).filter("id>:marker").params(\ marker = '%s' % marker).filter_by(\ @@ -158,8 +158,8 @@ def tenant_group_get_page(tenantId, marker,limit,session=None): return session.query(models.Group).filter_by(tenant_id=tenantId)\ .order_by(models.Group.id.desc()).limit(limit).all() #return session.query(models.Tenant).all() - - + + def tenant_group_get_page_markers(tenantId, marker,limit,session=None): if not session: session = get_session() @@ -279,7 +279,7 @@ def group_users(id, session=None): def users_tenant_group_get_page(group_id, marker,limit,session=None): if not session: session = get_session() - + if marker: return session.query(models.Users).filter_by(\ group_id=group_id).filter("id>:marker").params(\ @@ -289,9 +289,9 @@ def users_tenant_group_get_page(group_id, marker,limit,session=None): return session.query(models.Users).filter_by(\ group_id=group_id).order_by(\ models.Users.id.desc()).limit(limit).all() - - - + + + def users_tenant_group_get_page_markers(group_id, marker,limit,session=None): if not session: session = get_session() @@ -339,7 +339,7 @@ def group_get_all(session=None): def group_get_page(marker,limit,session=None): if not session: session = get_session() - + if marker: return session.query(models.Group).filter("id>:marker").params(\ marker = '%s' % marker).order_by\ @@ -347,9 +347,9 @@ def group_get_page(marker,limit,session=None): else: return session.query(models.Group).order_by(\ models.Group.id.desc()).limit(limit).all() - - - + + + def group_get_page_markers(marker,limit,session=None): if not session: session = get_session() diff --git a/keystone/keystone.db b/keystone/keystone.db Binary files differnew file mode 100644 index 00000000..16fcf4d7 --- /dev/null +++ b/keystone/keystone.db diff --git a/keystone/logic/service.py b/keystone/logic/service.py index ef76867b..40b865b8 100644 --- a/keystone/logic/service.py +++ b/keystone/logic/service.py @@ -47,7 +47,7 @@ class IDMService(object): # # Look for an existing token, or create one, - # TODO: Handle tenant/token search + # TODO(Jorge): Handle tenant/token search # dtoken = db_api.token_for_user(duser.id) if not dtoken or dtoken.expires < datetime.now(): @@ -124,12 +124,12 @@ class IDMService(object): # dtenant.desc, dtenant.enabled)) # return tenants.Tenants(ts, []) - - + + ## ## GET Tenants with Pagination ## - + def get_tenants(self, admin_token, marker, limit, url): self.__validate_token(admin_token) @@ -142,13 +142,13 @@ class IDMService(object): links=[] if prev: links.append(atom.Link('prev',"%s?'marker=%s&limit=%s'" % (url,prev,limit))) - if next: + if next: links.append(atom.Link('next',"%s?'marker=%s&limit=%s'" % (url,next,limit))) - - + + return tenants.Tenants(ts, links) - + def get_tenant(self, admin_token, tenant_id): self.__validate_token(admin_token) @@ -188,11 +188,11 @@ class IDMService(object): db_api.tenant_delete(dtenant.id) return None - + # # Tenant Group Operations # - + def create_tenant_group(self, admin_token, tenant, group): self.__validate_token(admin_token) @@ -201,7 +201,7 @@ class IDMService(object): if tenant == None: raise fault.BadRequestFault("Expecting a Tenant Id") - + dtenant = db_api.tenant_get(tenant) if dtenant == None: raise fault.ItemNotFoundFault("The tenant not found") @@ -209,7 +209,7 @@ class IDMService(object): if group.group_id == None: raise fault.BadRequestFault("Expecting a Group Id") - + if db_api.group_get(group.group_id) != None: raise fault.TenantGroupConflictFault( "A tenant group with that id already exists") @@ -223,20 +223,20 @@ class IDMService(object): return tenants.Group(dtenant.id, dtenant.desc, dtenant.tenant_id) - - + + def get_tenant_groups(self, admin_token, tenantId, marker, limit, url): self.__validate_token(admin_token) if tenantId == None: raise fault.BadRequestFault("Expecting a Tenant Id") - + dtenant = db_api.tenant_get(tenantId) if dtenant == None: raise fault.ItemNotFoundFault("The tenant not found") - + ts = [] dtenantgroups = db_api.tenant_group_get_page(tenantId, marker,limit) - + for dtenantgroup in dtenantgroups: ts.append(tenants.Group(dtenantgroup.id, dtenantgroup.desc, dtenantgroup.tenant_id)) @@ -244,27 +244,27 @@ class IDMService(object): links=[] if prev: links.append(atom.Link('prev',"%s?'marker=%s&limit=%s'" % (url,prev,limit))) - if next: + if next: links.append(atom.Link('next',"%s?'marker=%s&limit=%s'" % (url,next,limit))) - - + + return tenants.Groups(ts, links) - + def get_tenant_group(self, admin_token, tenant_id, group_id): self.__validate_token(admin_token) - + dtenant = db_api.tenant_get(tenant_id) if dtenant == None: raise fault.ItemNotFoundFault("The tenant not found") - + dtenant = db_api.tenant_group_get(group_id, tenant_id) if not dtenant: raise fault.ItemNotFoundFault("The tenant group not found") - - + + return tenants.Group(dtenant.id, dtenant.desc, dtenant.tenant_id) - - + + def update_tenant_group(self, admin_token, tenant_id, group_id, group): self.__validate_token(admin_token) @@ -275,32 +275,32 @@ class IDMService(object): dtenant = db_api.tenant_get(tenant_id) if dtenant == None: raise fault.ItemNotFoundFault("The tenant not found") - + dtenant = db_api.tenant_group_get(group_id, tenant_id) if not dtenant: raise fault.ItemNotFoundFault("The tenant group not found") - + if group_id != group.group_id: raise fault.BadRequestFault("Wrong Data Provided,Group id not matching") - + if str(tenant_id) != str(group.tenant_id): - raise fault.BadRequestFault("Wrong Data Provided, Tenant id not matching ") - + raise fault.BadRequestFault("Wrong Data Provided, Tenant id not matching ") + values = {'desc': group.description} db_api.tenant_group_update(group_id, tenant_id, values) return tenants.Group(group_id, group.description, tenant_id) - + def delete_tenant_group(self, admin_token, tenant_id, group_id): self.__validate_token(admin_token) dtenant = db_api.tenant_get(tenant_id) - + if dtenant == None: raise fault.ItemNotFoundFault("The tenant not found") - + dtenant = db_api.tenant_group_get(group_id, tenant_id) if not dtenant: raise fault.ItemNotFoundFault("The tenant group not found") @@ -311,22 +311,22 @@ class IDMService(object): db_api.tenant_group_delete(group_id, tenant_id) return None - - + + def get_users_tenant_group(self, admin_token, tenantId, groupId, marker, limit, url): self.__validate_token(admin_token) if tenantId == None: raise fault.BadRequestFault("Expecting a Tenant Id") - + if db_api.tenant_get(tenantId) == None: raise fault.ItemNotFoundFault("The tenant not found") - + if db_api.tenant_group_get(groupId, tenantId) == None: raise fault.ItemNotFoundFault( "A tenant group with that id not found") - + ts = [] - + dgroupusers = db_api.users_tenant_group_get_page( groupId, marker,limit) for dgroupuser in dgroupusers: ts.append(tenants.User(dgroupuser.id, @@ -335,13 +335,13 @@ class IDMService(object): links=[] if prev: links.append(atom.Link('prev',"%s?'marker=%s&limit=%s'" % (url,prev,limit))) - if next: + if next: links.append(atom.Link('next',"%s?'marker=%s&limit=%s'" % (url,next,limit))) - - + + return tenants.Users(ts, links) - - + + # # Private Operations # diff --git a/keystone/logic/types/fault.py b/keystone/logic/types/fault.py index fd2e85cc..aa920184 100644 --- a/keystone/logic/types/fault.py +++ b/keystone/logic/types/fault.py @@ -116,8 +116,8 @@ class TenantGroupConflictFault(IDMFault): def __init__(self, msg, details=None, code=409): super(TenantGroupConflictFault, self).__init__(msg, details, code) self.key = "tenantGroupConflict" - - + + class OverlimitFault(IDMFault): "A limit has been exceeded" diff --git a/keystone/logic/types/tenant.py b/keystone/logic/types/tenant.py index 5e933b9b..c8759628 100644 --- a/keystone/logic/types/tenant.py +++ b/keystone/logic/types/tenant.py @@ -109,14 +109,14 @@ class Tenants(object): def to_xml(self): dom = etree.Element("tenants") - dom.set(u"xmlns","http://docs.openstack.org/idm/api/v1.0") - + dom.set(u"xmlns","http://docs.openstack.org/idm/api/v1.0") + for t in self.values: dom.append(t.to_dom()) - + for t in self.links: dom.append(t.to_dom()) - + return etree.tostring(dom) def to_json(self): @@ -130,7 +130,7 @@ class Group(object): "Describes a group in the auth system" def __init__(self, group_id, description, tenant_id=''): - + self.description = description self.group_id = group_id self.tenant_id = tenant_id @@ -145,8 +145,8 @@ class Group(object): raise fault.BadRequestFault("Expecting Group") group_id = root.get("id") tenant_id = root.get("tenantId") - - + + desc = root.find("{http://docs.openstack.org/idm/api/v1.0}" "description") if desc == None: @@ -159,21 +159,21 @@ class Group(object): def from_json(json_str): try: obj = json.loads(json_str) - + if not "group" in obj: raise fault.BadRequestFault("Expecting group") group = obj["group"] - + if not "id" in group: group_id = None else: group_id = group["id"] - + if not "tenantId" in group: tenantId = None else: tenantId = group["tenantId"] - + if not "description" in group: raise fault.BadRequestFault("Expecting Group Description") description = group["description"] @@ -186,13 +186,13 @@ class Group(object): xmlns="http://docs.openstack.org/idm/api/v1.0") if self.group_id: dom.set("id", self.group_id) - + if self.tenant_id: dom.set("tenantId", self.tenant_id) - - + + desc = etree.Element("description") - + desc.text = self.description dom.append(desc) return dom @@ -206,7 +206,7 @@ class Group(object): group["id"] = self.group_id group["description"] = self.description group["tenantId"] = self.tenant_id - + return {'group': group} def to_json(self): @@ -222,30 +222,30 @@ class Groups(object): def to_xml(self): dom = etree.Element("groups") - dom.set(u"xmlns","http://docs.openstack.org/idm/api/v1.0") - + dom.set(u"xmlns","http://docs.openstack.org/idm/api/v1.0") + for t in self.values: dom.append(t.to_dom()) - + for t in self.links: dom.append(t.to_dom()) - + return etree.tostring(dom) def to_json(self): values = [t.to_dict()["group"] for t in self.values] links = [t.to_dict()["links"] for t in self.links] return json.dumps({"groups": {"values": values,"links":links}}) - - - + + + class User(object): "Describes a user in the auth system" def __init__(self, user_id, email, group_id, tenant_id, enabled): - + self.user_id = user_id - + self.tenant_id = tenant_id self.email = email self.enabled = enabled and True or False @@ -259,8 +259,8 @@ class User(object): if root == None: raise fault.BadRequestFault("Expecting Group") group_id = root.get("id") - - + + desc = root.find("{http://docs.openstack.org/idm/api/v1.0}" "description") if desc == None: @@ -273,16 +273,16 @@ class User(object): def from_json(json_str): try: obj = json.loads(json_str) - + if not "group" in obj: raise fault.BadRequestFault("Expecting group") group = obj["group"] - + if not "id" in group: group_id = None else: group_id = group["id"] - + if not "description" in group: raise fault.BadRequestFault("Expecting Group Description") description = group["description"] @@ -290,22 +290,22 @@ class User(object): except (ValueError, TypeError) as e: raise fault.BadRequestFault("Cannot parse Group", str(e)) """ - + def to_dom(self): dom = etree.Element("user", xmlns="http://docs.openstack.org/idm/api/v1.0") if self.group_id: dom.set("id", self.group_id) - + if self.tenant_id: dom.set("tenantId", self.tenant_id) - + if self.tenant_id: dom.set("email", self.email) - + if self.tenant_id: dom.set("enabled", self.enabled) - + return dom def to_xml(self): @@ -318,7 +318,7 @@ class User(object): group["email"] = self.email group["enabled"] = self.enabled group["tenantId"] = self.tenant_id - + return {'user': user} def to_json(self): @@ -334,14 +334,14 @@ class Users(object): def to_xml(self): dom = etree.Element("users") - dom.set(u"xmlns","http://docs.openstack.org/idm/api/v1.0") - + dom.set(u"xmlns","http://docs.openstack.org/idm/api/v1.0") + for t in self.values: dom.append(t.to_dom()) - + for t in self.links: dom.append(t.to_dom()) - + return etree.tostring(dom) def to_json(self): diff --git a/keystone/identity.py b/keystone/server.py index 18c826ef..c5e53fdf 100644 --- a/keystone/identity.py +++ b/keystone/server.py @@ -55,6 +55,7 @@ POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), os.pardir)) if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'keystone', '__init__.py')): sys.path.insert(0, POSSIBLE_TOPDIR) +print POSSIBLE_TOPDIR import keystone.logic.service as serv import keystone.logic.types.auth as auth @@ -150,10 +151,10 @@ def wrap_error(func): @wrap_error def get_version_info(): if is_xml_response(): - resp_file = "content/version.xml" + resp_file = os.path.join(POSSIBLE_TOPDIR, "keystone/content/version.xml.tpl") response.content_type = "application/xml" else: - resp_file = "content/version.json" + resp_file = os.path.join(POSSIBLE_TOPDIR, "keystone/content/version.json.tpl") response.content_type = "application/json" hostname = request.environ.get("SERVER_NAME") port = request.environ.get("SERVER_PORT") @@ -387,7 +388,11 @@ def get_extension(ext_alias): # raise fault.ItemNotFoundFault("The extension is not found") +def start_server(port=8080): + app = exthandler.UrlExtensionFilter(bottle.default_app(), None) + wsgi.server(eventlet.listen(('', port)), app) if __name__ == "__main__": - app = exthandler.UrlExtensionFilter(bottle.default_app(), None) - wsgi.server(eventlet.listen(('', 8080)), app) + start_server() + + diff --git a/pip-requires b/pip-requires index 1ef8484b..6eb85607 100644 --- a/pip-requires +++ b/pip-requires @@ -5,5 +5,6 @@ paste pastedeploy pastescript pysqlite +routes sqlalchemy webob diff --git a/test/unit/test_identity.py b/test/unit/test_identity.py index 76125f79..c7824372 100644 --- a/test/unit/test_identity.py +++ b/test/unit/test_identity.py @@ -1,2093 +1,2119 @@ -import os
-import sys
-# Need to access identity module
-sys.path.append(os.path.abspath(os.path.join(os.path.abspath(__file__),
- '..', '..', '..', '..', 'keystone')))
-from keystone import identity
-import unittest
-from webtest import TestApp
-import httplib2
-import json
-from lxml import etree
-import unittest
-from webtest import TestApp
-
-URL = 'http://localhost:8080/v1.0/'
-
-
-def get_token(user, pswd, kind=''):
- h = httplib2.Http(".cache")
- url = '%stoken' % URL
- body = {"passwordCredentials": {"username": user,
- "password": pswd}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json"})
- content = json.loads(content)
- token = str(content['auth']['token']['id'])
- if kind == 'token':
- return token
- else:
- return (resp, content)
-
-
-def delete_token(token, auth_token):
- h = httplib2.Http(".cache")
- url = '%stoken/%s' % (URL, token)
- resp, content = h.request(url, "DELETE", body='', \
- headers={"Content-Type": "application/json", \
- "X-Auth-Token": auth_token})
- return (resp, content)
-
-
-def create_tenant(tenantid, auth_token):
- h = httplib2.Http(".cache")
-
- url = '%stenants' % (URL)
- body = {"tenant": {"id": tenantid,
- "description": "A description ...",
- "enabled": True}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": auth_token})
- return (resp, content)
-
-
-def create_tenant_group(groupid, tenantid, auth_token):
- h = httplib2.Http(".cache")
-
- url = '%stenant/%s/groups' % (URL,tenantid)
- body = {"group": {"id": groupid,
- "description": "A description ..."
- }}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": auth_token})
- return (resp, content)
-
-
-def delete_tenant(tenantid, auth_token):
- h = httplib2.Http(".cache")
- url = '%stenants/%s' % (URL, tenantid)
- resp, content = h.request(url, "DELETE", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": auth_token})
- return (resp, content)
-
-
-def delete_tenant_group(groupid, tenantid, auth_token):
- h = httplib2.Http(".cache")
- url = '%stenant/%s/groups/%s' % (URL, tenantid, groupid)
- resp, content = h.request(url, "DELETE", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": auth_token})
- return (resp, content)
-
-
-def create_global_group(auth_token):
- h = httplib2.Http(".cache")
-
- url = '%s/groups' % (URL)
- body = {"group": {"id": 'Admin',
- "description": "A description ..."
- }}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": auth_token})
- return (resp, content)
-
-
-def delete_global_group(groupid, auth_token):
- h = httplib2.Http(".cache")
- url = '%s/groups/%s' % (URL, groupid)
- resp, content = h.request(url, "DELETE", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": auth_token})
- return (resp, content)
-
-
-def get_token_xml(user, pswd, type=''):
- h = httplib2.Http(".cache")
- url = '%stoken' % URL
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <passwordCredentials \
- xmlns="http://docs.openstack.org/idm/api/v1.0" \
- password="%s" username="%s" \
- tenantId="77654"/> ' % (pswd, user)
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "ACCEPT": "application/xml"})
- dom = etree.fromstring(content)
- root = dom.find("{http://docs.openstack.org/idm/api/v1.0}token")
- token_root = root.attrib
- token = str(token_root['id'])
- if type == 'token':
- return token
- else:
- return (resp, content)
-
-
-def delete_token_xml(token, auth_token):
- h = httplib2.Http(".cache")
- url = '%stoken/%s' % (URL, token)
- resp, content = h.request(url, "DELETE", body='',\
- headers={"Content-Type": "application/xml", \
- "X-Auth-Token": auth_token,
- "ACCEPT": "application/xml"})
- return (resp, content)
-
-
-def create_tenant_xml(tenantid, auth_token):
- h = httplib2.Http(".cache")
- url = '%stenants' % (URL)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true" id="%s"> \
- <description>A description...</description> \
- </tenant>' % tenantid
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": auth_token,
- "ACCEPT": "application/xml"})
- return (resp, content)
-
-
-def create_tenant_group_xml(groupid, tenantid, auth_token):
- h = httplib2.Http(".cache")
- url = '%stenant/%s/groups' % (URL,tenantid)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % groupid
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": auth_token,
- "ACCEPT": "application/xml"})
- return (resp, content)
-
-
-def delete_tenant_xml(tenantid, auth_token):
- h = httplib2.Http(".cache")
- url = '%stenants/%s' % (URL, tenantid)
- resp, content = h.request(url, "DELETE", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": auth_token,
- "ACCEPT": "application/xml"})
- return (resp, content)
-
-
-def delete_tenant_group_xml(groupid, tenantid, auth_token):
- h = httplib2.Http(".cache")
- url = '%stenant/%s/groups/%s' % (URL, tenantid, groupid)
- resp, content = h.request(url, "DELETE", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": auth_token,
- "ACCEPT": "application/xml"})
- return (resp, content)
-
-
-def create_global_group_xml(auth_token):
- h = httplib2.Http(".cache")
- url = '%s/groups' % (URL)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="Admin"> \
- <description>A description...</description> \
- </group>'
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": auth_token,
- "ACCEPT": "application/xml"})
- return (resp, content)
-
-
-def delete_global_group_xml(groupid, auth_token):
- h = httplib2.Http(".cache")
- url = '%s/groups/%s' % (URL, groupid)
- resp, content = h.request(url, "DELETE", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": auth_token,
- "ACCEPT": "application/xml"})
- return (resp, content)
-
-
-def get_tenant():
- return '1234'
-
-
-def get_user():
- return '1234'
-
-
-def get_userdisabled():
- return '1234'
-
-
-def get_auth_token():
- return '999888777666'
-
-
-def get_exp_auth_token():
- return '000999'
-
-
-def get_disabled_token():
- return '999888777'
-
-
-class identity_test(unittest.TestCase):
-
- #Given _a_ to make inherited test cases in an order.
- #here to call below method will call as last test case
-
- def test_a_get_version(self):
- h = httplib2.Http(".cache")
- url = URL
- resp, content = h.request(url, "GET", body="",
- headers={"Content-Type": "application/json"})
- self.assertEqual(200, int(resp['status']))
- self.assertEqual('application/json', resp['content-type'])
-
- def test_a_get_version(self):
- h = httplib2.Http(".cache")
- url = URL
- resp, content = h.request(url, "GET", body="",
- headers={"Content-Type": "application/xml",
- "ACCEPT": "application/xml"})
- self.assertEqual(200, int(resp['status']))
- self.assertEqual('application/xml', resp['content-type'])
-
-
-class authorize_test(identity_test):
-
- def setUp(self):
- self.token = get_token('joeuser', 'secrete', 'token')
- self.tenant = get_tenant()
- self.user = get_user()
- self.userdisabled = get_userdisabled()
- self.auth_token = get_auth_token()
- self.exp_auth_token = get_exp_auth_token()
- self.disabled_token = get_disabled_token()
-
-
-
- def tearDown(self):
- delete_token(self.token, self.auth_token)
-
- def test_a_authorize(self):
- resp, content = get_token('joeuser', 'secrete')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual('application/json', resp['content-type'])
-
- def test_a_authorize_xml(self):
- resp, content = get_token_xml('joeuser', 'secrete')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual('application/xml', resp['content-type'])
-
- def test_a_authorize_user_disaabled(self):
- h = httplib2.Http(".cache")
- url = '%stoken' % URL
- body = {"passwordCredentials": {"username": "disabled",
- "password": "self.tenant_group='test_tenant_group'secrete"}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json"})
- content = json.loads(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_a_authorize_user_disaabled_xml(self):
- h = httplib2.Http(".cache")
- url = '%stoken' % URL
-
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <passwordCredentials \
- xmlns="http://docs.openstack.org/idm/api/v1.0" \
- password="secrete" username="disabled" \
- />'
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "ACCEPT": "application/xml"})
- content = etree.fromstring(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_a_authorize_user_wrong(self):
- h = httplib2.Http(".cache")
- url = '%stoken' % URL
- body = {"passwordCredentials": {"username-w": "disabled",
- "password": "secrete"}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json"})
- content = json.loads(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(400, int(resp['status']))
-
- def test_a_authorize_user_wrong_xml(self):
- h = httplib2.Http(".cache")
- url = '%stoken' % URL
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <passwordCredentials \
- xmlns="http://docs.openstack.org/idm/api/v1.0" \
- password="secrete" username-w="disabled" \
- />'
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "ACCEPT": "application/xml"})
- content = etree.fromstring(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(400, int(resp['status']))
-
-
-class validate_token(authorize_test):
-
- def test_validate_token_true(self):
- h = httplib2.Http(".cache")
-
- url = '%stoken/%s?belongsTo=%s' % (URL, self.token, self.tenant)
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/json", \
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual('application/json', resp['content-type'])
-
- def test_validate_token_true_xml(self):
- h = httplib2.Http(".cache")
- url = '%stoken/%s?belongsTo=%s' % (URL, self.token, self.tenant)
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml", \
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual('application/xml', resp['content-type'])
-
- def test_validate_token_expired(self):
- h = httplib2.Http(".cache")
- url = '%stoken/%s?belongsTo=%s' % (URL, self.exp_auth_token, \
- self.tenant)
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/json", \
- "X-Auth-Token": self.exp_auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
- self.assertEqual('application/json', resp['content-type'])
-
- def test_validate_token_expired_xml(self):
- h = httplib2.Http(".cache")
-
- url = '%stoken/%s?belongsTo=%s' % (URL, self.exp_auth_token, \
- self.tenant)
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml", \
- "X-Auth-Token": self.exp_auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
- self.assertEqual('application/xml', resp['content-type'])
-
- def test_validate_token_invalid(self):
- h = httplib2.Http(".cache")
- url = '%stoken/%s?belongsTo=%s' % (URL, 'NonExistingToken', \
- self.tenant)
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/json", \
- "X-Auth-Token": self.auth_token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
- self.assertEqual('application/json', resp['content-type'])
-
- def test_validate_token_invalid_xml(self):
- h = httplib2.Http(".cache")
- url = '%stoken/%s?belongsTo=%s' % (URL, 'NonExistingToken', \
- self.tenant)
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/json", \
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
- self.assertEqual('application/json', resp['content-type'])
-
-
-class tenant_test(unittest.TestCase):
-
- def setUp(self):
- self.token = get_token('joeuser', 'secrete', 'token')
- self.tenant = get_tenant()
- self.user = get_user()
- self.userdisabled = get_userdisabled()
- self.auth_token = get_auth_token()
- self.exp_auth_token = get_exp_auth_token()
- self.disabled_token = get_disabled_token()
-
- def tearDown(self):
- resp, content = delete_tenant(self.tenant, self.auth_token)
-""" "passwordCredentials" : {"username" : "joeuser","password": "secrete","tenantId": "1234"}
-"""
-
-class create_tenant_test(tenant_test):
-
- def test_tenant_create(self):
- resp, content = delete_tenant('test_tenant', str(self.auth_token))
-
- resp, content = create_tenant('test_tenant', str(self.auth_token))
- self.tenant = 'test_tenant'
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
-
- if int(resp['status']) not in (200, 201):
-
- self.fail('Failed due to %d' % int(resp['status']))
-
- def test_tenant_create_xml(self):
- resp, content = delete_tenant_xml('test_tenant', str(self.auth_token))
- resp, content = create_tenant_xml('test_tenant', str(self.auth_token))
- self.tenant = 'test_tenant'
- content = etree.fromstring(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
-
- if int(resp['status']) not in (200, 201):
-
- self.fail('Failed due to %d' % int(resp['status']))
-
- def test_tenant_create_again(self):
-
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(409, int(resp['status']))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- def test_tenant_create_again_xml(self):
-
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get("id")
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(409, int(resp['status']))
- if int(resp['status']) == 200:
- self.tenant = content.get("id")
-
- def test_tenant_create_forbidden_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenants' % (URL)
- body = {"tenant": {"id": self.tenant,
- "description": "A description ...",
- "enabled": True}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": self.token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_create_forbidden_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenants' % (URL)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true" id="%s"> \
- <description>A description...</description> \
- </tenant>' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_create_expired_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenants' % (URL)
- body = {"tenant": {"id": self.tenant,
- "description": "A description ...",
- "enabled": True}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": self.exp_auth_token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_create_expired_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenants' % (URL)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true" id="%s"> \
- <description>A description...</description> \
- </tenant>' % self.tenant
-
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.exp_auth_token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_create_missing_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenants' % (URL)
- body = {"tenant": {"id": self.tenant,
- "description": "A description ...",
- "enabled": True}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_create_missing_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenants' % (URL)
-
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true" id="%s"> \
- <description>A description...</description> \
- </tenant>' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_create_disabled_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenants' % (URL)
- body = '{"tenant": { "id": "%s", \
- "description": "A description ...", "enabled"\
- :true } }' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.disabled_token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_create_disabled_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenants' % (URL)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true" id="%s"> \
- <description>A description...</description> \
- </tenant>' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "X-Auth-Token": self.disabled_token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_create_invalid_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenants' % (URL)
- body = '{"tenant": { "id": "%s", \
- "description": "A description ...", "enabled"\
- :true } }' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": 'nonexsitingtoken'})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_create_invalid_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenants' % (URL)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true" id="%s"> \
- <description>A description...</description> \
- </tenant>' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": 'nonexsitingtoken',
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
-
-class get_tenants_test(tenant_test):
-
- def test_get_tenants(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenants_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenants_forbidden_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_get_tenants_forbidden_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_get_tenants_exp_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.exp_auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_get_tenants_exp_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.exp_auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
-
-class get_tenant_test(tenant_test):
-
- def test_get_tenant(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, self.tenant)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenant_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, self.tenant)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenant_bad(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, 'tenant_bad')
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_get_tenant_bad_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, 'tenant_bad')
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_get_tenant_not_found(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/NonexistingID' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_get_tenant_not_found_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/NonexistingID' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
-
-class update_tenant_test(tenant_test):
-
- def test_update_tenant(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, self.tenant)
- data = '{"tenant": { "description": "A NEW description..." ,\
- "enabled":true }}'
- #test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- body = json.loads(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual(int(self.tenant), int(body['tenant']['id']))
- self.assertEqual('A NEW description...', \
- body['tenant']['description'])
-
- def test_update_tenant_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, self.tenant)
- data = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true"> \
- <description>A NEW description...</description> \
- </tenant>'
-
- #test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- body = etree.fromstring(content)
- desc = body.find("{http://docs.openstack.org/idm/api/v1.0}description")
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual(int(self.tenant), int(body.get('id')))
- self.assertEqual('A NEW description...', \
- desc.text)
-
- def test_update_tenant_bad(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, self.tenant)
- data = '{"tenant": { "description_bad": "A NEW description...",\
- "enabled":true }}'
- #test for Content-Type = application/json
-
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(400, int(resp['status']))
-
- def test_update_tenant_bad_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, self.tenant)
- data = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true"> \
- <description_bad>A NEW description...</description> \
- </tenant>'
- #test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(400, int(resp['status']))
-
- def test_update_tenant_not_found(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/NonexistingID' % (URL)
- data = '{"tenant": { "description": "A NEW description...",\
- "enabled":true }}'
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body=data,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_update_tenant_not_found_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/NonexistingID' % (URL)
- data = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true"> \
- <description_bad>A NEW description...</description> \
- </tenant>'
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body=data,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
-
-class delete_tenant_test(tenant_test):
-
- def test_delete_tenant_not_found(self):
- #resp,content=create_tenant("test_tenant_delete", str(self.auth_token))
- resp, content = delete_tenant("test_tenant_delete111", \
- str(self.auth_token))
- self.assertEqual(404, int(resp['status']))
-
- def test_delete_tenant_not_found_xml(self):
- #resp,content=create_tenant("test_tenant_delete", str(self.auth_token))
- resp, content = delete_tenant_xml("test_tenant_delete111", \
- str(self.auth_token))
- self.assertEqual(404, int(resp['status']))
-
- def test_delete_tenant(self):
- resp, content = create_tenant("test_tenant_delete", \
- str(self.auth_token))
- resp, content = delete_tenant("test_tenant_delete", \
- str(self.auth_token))
- self.assertEqual(204, int(resp['status']))
-
- def test_delete_tenant_xml(self):
- resp, content = create_tenant_xml("test_tenant_delete", \
- str(self.auth_token))
- resp, content = delete_tenant_xml("test_tenant_delete", \
- str(self.auth_token))
- self.assertEqual(204, int(resp['status']))
-
-
-
-
-class tenant_group_test(unittest.TestCase):
-
- def setUp(self):
- self.token = get_token('joeuser', 'secrete', 'token')
- self.tenant = get_tenant()
- self.user = get_user()
- self.userdisabled = get_userdisabled()
- self.auth_token = get_auth_token()
- self.exp_auth_token = get_exp_auth_token()
- self.disabled_token = get_disabled_token()
- self.tenant_group = 'test_tenant_group'
-
- def tearDown(self):
- resp, content = delete_tenant_group('test_tenant_group', \
- self.tenant, self.auth_token)
- resp, content = delete_tenant(self.tenant, self.auth_token)
-
-
-class create_tenant_group_test(tenant_group_test):
-
- def test_tenant_group_create(self):
- resp, content = delete_tenant('test_tenant', str(self.auth_token))
- resp, content = create_tenant('test_tenant', str(self.auth_token))
-
- respG, contentG = delete_tenant_group('test_tenant_group', \
- 'test_tenant', str(self.auth_token))
- respG, contentG = create_tenant_group('test_tenant_group', \
- 'test_tenant', str(self.auth_token))
-
- self.tenant = 'test_tenant'
- self.tenant_group = 'test_tenant_group'
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
-
- if int(respG['status']) not in (200, 201):
-
- self.fail('Failed due to %d' % int(respG['status']))
-
- def test_tenant_group_create_xml(self):
- resp, content = delete_tenant_xml('test_tenant', str(self.auth_token))
- resp, content = create_tenant_xml('test_tenant', str(self.auth_token))
- respG, contentG = delete_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
-
- self.tenant = 'test_tenant'
- self.tenant_group = 'test_tenant_group'
- content = etree.fromstring(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
-
- if int(respG['status']) not in (200, 201):
-
- self.fail('Failed due to %d' % int(respG['status']))
-
- def test_tenant_group_create_again(self):
-
- resp, content = create_tenant("test_tenant", str(self.auth_token))
-
- respG, contentG = create_tenant_group('test_tenant_group', \
- "test_tenant", str(self.auth_token))
- respG, contentG = create_tenant_group('test_tenant_group', \
- "test_tenant", str(self.auth_token))
-
- if int(respG['status']) == 200:
- self.tenant = content['tenant']['id']
- self.tenant_group = contentG['group']['id']
- if int(respG['status']) == 500:
- self.fail('IDM fault')
- elif int(respG['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(409, int(respG['status']))
- if int(respG['status']) == 200:
- self.tenant = content['tenant']['id']
- self.tenant_group = contentG['group']['id']
-
- def test_tenant_group_create_again_xml(self):
-
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
-
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
-
- content = etree.fromstring(content)
- contentG = etree.fromstring(contentG)
- if int(respG['status']) == 200:
- self.tenant = content.get("id")
- self.tenant_group = contentG.get("id")
-
- if int(respG['status']) == 500:
- self.fail('IDM fault')
- elif int(respG['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(409, int(respG['status']))
- if int(respG['status']) == 200:
- self.tenant = content.get("id")
- self.tenant_group = contentG.get("id")
-
- def test_tenant_group_create_forbidden_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- if int(respG['status']) == 200:
- self.tenant_group = respG['group']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = {"group": {"id": self.tenant_group,
- "description": "A description ..."
- }}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": self.token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_group_create_forbidden_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml", \
- "X-Auth-Token": self.token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
-
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_group_create_expired_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = {"group": {"id": self.tenant_group,
- "description": "A description ..."
- }}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": self.exp_auth_token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_expired_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant
-
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml", \
- "X-Auth-Token": self.exp_auth_token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_missing_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = {"group": {"id": self.tenant_group,
- "description": "A description ..."}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_missing_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
-
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_disabled_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '{"group": { "id": "%s", \
- "description": "A description ..." } }' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.disabled_token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_group_create_disabled_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "X-Auth-Token": self.disabled_token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_group_create_invalid_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '{"group": { "id": "%s", \
- "description": "A description ..." } }' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": 'nonexsitingtoken'})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_invalid_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": 'nonexsitingtoken',
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
-
-class get_tenant_groups_test(tenant_group_test):
-
- def test_get_tenant_groups(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
-
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
-
- url = '%stenant/%s/groups' % (URL,self.tenant)
-
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenant_groups_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
-
- respG, contentG = create_tenant_group_xml(self.tenant_group,\
- self.tenant, str(self.auth_token))
-
- url = '%stenant/%s/groups' % (URL,self.tenant)
-
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenant_groups_forbidden_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
-
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups' % (URL,self.tenant)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_get_tenant_groups_forbidden_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups' % (URL,self.tenant)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_get_tenant_groups_exp_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups' % (URL,self.tenant)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.exp_auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_get_tenant_groups_exp_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups' % (URL,self.tenant)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.exp_auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
-
-class get_tenant_group_test(tenant_group_test):
-
- def test_get_tenant_group(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenant_group_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group_xml(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenant_group_bad(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,'tenant_bad',self.tenant_group)
-
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_get_tenant_group_bad_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,'tenant_bad',self.tenant_group)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_get_tenant_group_not_found(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,self.tenant,'nonexistinggroup')
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_get_tenant_group_not_found_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,self.tenant,'nonexistinggroup')
-
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
-
-class update_tenant_group_test(tenant_group_test):
-
- def test_update_tenant_group(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group)
-
- data = '{"group": { "id":"%s","description": "A NEW description..." ,\
- "tenantId":"%s" }}' % (self.tenant_group,self.tenant)
- #test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- body = json.loads(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual(self.tenant_group, body['group']['id'])
- self.assertEqual('A NEW description...', \
- body['group']['description'])
-
- def test_update_tenant_group_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL, self.tenant ,self.tenant_group)
- data = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- tenantId="%s" id="%s"> \
- <description>A NEW description...</description> \
- </group>' % (self.tenant, self.tenant_group)
-
- #test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
-
- body = etree.fromstring(content)
- desc = body.find("{http://docs.openstack.org/idm/api/v1.0}description")
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual(str(self.tenant_group), str(body.get('id')))
- self.assertEqual('A NEW description...', \
- desc.text)
-
- def test_update_tenant_group_bad(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group)
- data = '{"group": { "description_bad": "A NEW description...",\
- "id":"%s","tenantId":"%s" }}' % (self.tenant_group,self.tenant)
- #test for Content-Type = application/json
-
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(400, int(resp['status']))
-
- def test_update_tenant_group_bad_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group)
- data = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- tenantId="%s" id="%s"> \
- <description_bad>A NEW description...</description> \
- </group>' % (self.tenant, self.tenant_group)
- #test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(400, int(resp['status']))
-
- def test_update_tenant_group_not_found(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/NonexistingID' % (URL, self.tenant)
-
- data = '{"group": { "description": "A NEW description...",\
- "id":"NonexistingID", "tenantId"="test_tenant" }}'
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body=data,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_update_tenant_group_not_found_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/NonexistingID' % (URL, self.tenant)
- data = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="NonexistingID", "tenant_id"="test_tenant"> \
- <description_bad>A NEW description...</description> \
- </group>'
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body=data,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
-
-class delete_tenant_group_test(tenant_test):
-
- def test_delete_tenant_group_not_found(self):
- #resp,content=create_tenant("test_tenant_delete", str(self.auth_token))
- resp, content = delete_tenant_group("test_tenant_delete111", \
- "test_tenant", str(self.auth_token))
- self.assertEqual(404, int(resp['status']))
-
- def test_delete_tenant_group_not_found_xml(self):
- #resp,content=create_tenant("test_tenant_delete", str(self.auth_token))
- resp, content = delete_tenant_group_xml("test_tenant_delete111", \
- "test_tenant", str(self.auth_token))
- self.assertEqual(404, int(resp['status']))
-
- def test_delete_tenant_group(self):
- resp, content = create_tenant("test_tenant_delete", \
- str(self.auth_token))
- respG, contentG = create_tenant_group('test_tenant_group_delete', \
- "test_tenant_delete", str(self.auth_token))
- respG, contentG = delete_tenant_group('test_tenant_group_delete', \
- "test_tenant_delete", str(self.auth_token))
- resp, content = delete_tenant("test_tenant_delete", \
- str(self.auth_token))
- self.assertEqual(204, int(respG['status']))
-
- def test_delete_tenant_group_xml(self):
- resp, content = create_tenant_xml("test_tenant_delete", \
- str(self.auth_token))
- respG, contentG = create_tenant_group_xml('test_tenant_group_delete', \
- "test_tenant_delete", str(self.auth_token))
- respG, contentG = delete_tenant_group_xml('test_tenant_group_delete', \
- "test_tenant_delete", str(self.auth_token))
- resp, content = delete_tenant_xml("test_tenant_delete", \
- str(self.auth_token))
- self.assertEqual(204, int(respG['status']))
-
-class create_global_group_test(global_group_test):
-
- def test_global_group_create(self):
-
- respG, contentG = delete_global_group('test_tenant_group', \
- str(self.auth_token))
- respG, contentG = create_global_group(str(self.auth_token))
- self.group = 'test_tenant_group'
-
- if int(respG['status']) == 500:
- self.fail('IDM fault')
- elif int(respG['status']) == 503:
- self.fail('Service Not Available')
- if int(respG['status']) not in (200, 201):
- self.fail('Failed due to %d' % int(respG['status']))
-
-
- def test_global_group_create_again(self):
-
- respG, contentG = create_global_group('test_tenant_group', \
- str(self.auth_token))
- respG, contentG = create_global_group('test_tenant_group', \
- "test_tenant", str(self.auth_token))
-
- if int(respG['status']) == 200:
- self.tenant = content['tenant']['id']
- self.tenant_group = contentG['group']['id']
- if int(respG['status']) == 500:
- self.fail('IDM fault')
- elif int(respG['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(409, int(respG['status']))
- if int(respG['status']) == 200:
- self.tenant = content['tenant']['id']
- self.tenant_group = contentG['group']['id']
-
-
- def test_tenant_group_create_forbidden_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- if int(respG['status']) == 200:
- self.tenant_group = respG['group']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = {"group": {"id": self.tenant_group,
- "description": "A description ..."
- }}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": self.token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
-
- def test_tenant_group_create_expired_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = {"group": {"id": self.tenant_group,
- "description": "A description ..."
- }}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": self.exp_auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_missing_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = {"group": {"id": self.tenant_group,
- "description": "A description ..."}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_disabled_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '{"group": { "id": "%s", \
- "description": "A description ..." } }' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.disabled_token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_group_create_invalid_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '{"group": { "id": "%s", \
- "description": "A description ..." } }' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": 'nonexsitingtoken'})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
-
-
- def test_tenant_group_create_xml(self):
- resp, content = delete_tenant_xml('test_tenant', str(self.auth_token))
- resp, content = create_tenant_xml('test_tenant', str(self.auth_token))
- respG, contentG = delete_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
-
- self.tenant = 'test_tenant'
- self.tenant_group = 'test_tenant_group'
- content = etree.fromstring(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
-
- if int(respG['status']) not in (200, 201):
-
- self.fail('Failed due to %d' % int(respG['status']))
-
- def test_tenant_group_create_again_xml(self):
-
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
-
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
-
- content = etree.fromstring(content)
- contentG = etree.fromstring(contentG)
- if int(respG['status']) == 200:
- self.tenant = content.get("id")
- self.tenant_group = contentG.get("id")
-
- if int(respG['status']) == 500:
- self.fail('IDM fault')
- elif int(respG['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(409, int(respG['status']))
- if int(respG['status']) == 200:
- self.tenant = content.get("id")
- self.tenant_group = contentG.get("id")
-
- def test_tenant_group_create_forbidden_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml", \
- "X-Auth-Token": self.token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
-
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_group_create_expired_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant
-
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml", \
- "X-Auth-Token": self.exp_auth_token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_missing_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
-
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_disabled_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "X-Auth-Token": self.disabled_token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_group_create_invalid_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": 'nonexsitingtoken',
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
-
-
-if __name__ == '__main__':
- unittest.main()
+# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import httplib2 +import json +from lxml import etree +import os +import sys +from webtest import TestApp +import unittest + +# Need to access server module +sys.path.append(os.path.abspath(os.path.join(os.path.abspath(__file__), + '..', '..', '..', '..', 'keystone'))) +from keystone import server + +URL = 'http://localhost:8080/v1.0/' + + +def get_token(user, pswd, kind=''): + h = httplib2.Http(".cache") + url = '%stoken' % URL + body = {"passwordCredentials": {"username": user, + "password": pswd}} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json"}) + content = json.loads(content) + token = str(content['auth']['token']['id']) + if kind == 'token': + return token + else: + return (resp, content) + + +def delete_token(token, auth_token): + h = httplib2.Http(".cache") + url = '%stoken/%s' % (URL, token) + resp, content = h.request(url, "DELETE", body='', \ + headers={"Content-Type": "application/json", \ + "X-Auth-Token": auth_token}) + return (resp, content) + + +def create_tenant(tenantid, auth_token): + h = httplib2.Http(".cache") + + url = '%stenants' % (URL) + body = {"tenant": {"id": tenantid, + "description": "A description ...", + "enabled": True}} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json", + "X-Auth-Token": auth_token}) + return (resp, content) + + +def create_tenant_group(groupid, tenantid, auth_token): + h = httplib2.Http(".cache") + + url = '%stenant/%s/groups' % (URL,tenantid) + body = {"group": {"id": groupid, + "description": "A description ..." + }} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json", + "X-Auth-Token": auth_token}) + return (resp, content) + + +def delete_tenant(tenantid, auth_token): + h = httplib2.Http(".cache") + url = '%stenants/%s' % (URL, tenantid) + resp, content = h.request(url, "DELETE", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": auth_token}) + return (resp, content) + + +def delete_tenant_group(groupid, tenantid, auth_token): + h = httplib2.Http(".cache") + url = '%stenant/%s/groups/%s' % (URL, tenantid, groupid) + resp, content = h.request(url, "DELETE", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": auth_token}) + return (resp, content) + + +def create_global_group(auth_token): + h = httplib2.Http(".cache") + + url = '%s/groups' % (URL) + body = {"group": {"id": 'Admin', + "description": "A description ..." + }} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json", + "X-Auth-Token": auth_token}) + return (resp, content) + + +def delete_global_group(groupid, auth_token): + h = httplib2.Http(".cache") + url = '%s/groups/%s' % (URL, groupid) + resp, content = h.request(url, "DELETE", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": auth_token}) + return (resp, content) + + +def get_token_xml(user, pswd, type=''): + h = httplib2.Http(".cache") + url = '%stoken' % URL + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <passwordCredentials \ + xmlns="http://docs.openstack.org/idm/api/v1.0" \ + password="%s" username="%s" \ + tenantId="77654"/> ' % (pswd, user) + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml", + "ACCEPT": "application/xml"}) + dom = etree.fromstring(content) + root = dom.find("{http://docs.openstack.org/idm/api/v1.0}token") + token_root = root.attrib + token = str(token_root['id']) + if type == 'token': + return token + else: + return (resp, content) + + +def delete_token_xml(token, auth_token): + h = httplib2.Http(".cache") + url = '%stoken/%s' % (URL, token) + resp, content = h.request(url, "DELETE", body='',\ + headers={"Content-Type": "application/xml", \ + "X-Auth-Token": auth_token, + "ACCEPT": "application/xml"}) + return (resp, content) + + +def create_tenant_xml(tenantid, auth_token): + h = httplib2.Http(".cache") + url = '%stenants' % (URL) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \ + enabled="true" id="%s"> \ + <description>A description...</description> \ + </tenant>' % tenantid + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": auth_token, + "ACCEPT": "application/xml"}) + return (resp, content) + + +def create_tenant_group_xml(groupid, tenantid, auth_token): + h = httplib2.Http(".cache") + url = '%stenant/%s/groups' % (URL,tenantid) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="%s"> \ + <description>A description...</description> \ + </group>' % groupid + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": auth_token, + "ACCEPT": "application/xml"}) + return (resp, content) + + +def delete_tenant_xml(tenantid, auth_token): + h = httplib2.Http(".cache") + url = '%stenants/%s' % (URL, tenantid) + resp, content = h.request(url, "DELETE", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": auth_token, + "ACCEPT": "application/xml"}) + return (resp, content) + + +def delete_tenant_group_xml(groupid, tenantid, auth_token): + h = httplib2.Http(".cache") + url = '%stenant/%s/groups/%s' % (URL, tenantid, groupid) + resp, content = h.request(url, "DELETE", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": auth_token, + "ACCEPT": "application/xml"}) + return (resp, content) + + +def create_global_group_xml(auth_token): + h = httplib2.Http(".cache") + url = '%s/groups' % (URL) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="Admin"> \ + <description>A description...</description> \ + </group>' + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": auth_token, + "ACCEPT": "application/xml"}) + return (resp, content) + + +def delete_global_group_xml(groupid, auth_token): + h = httplib2.Http(".cache") + url = '%s/groups/%s' % (URL, groupid) + resp, content = h.request(url, "DELETE", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": auth_token, + "ACCEPT": "application/xml"}) + return (resp, content) + + +def get_tenant(): + return '1234' + + +def get_user(): + return '1234' + + +def get_userdisabled(): + return '1234' + + +def get_auth_token(): + return '999888777666' + + +def get_exp_auth_token(): + return '000999' + + +def get_disabled_token(): + return '999888777' + + +class server_test(unittest.TestCase): + + #Given _a_ to make inherited test cases in an order. + #here to call below method will call as last test case + + def test_get_version_json(self): + h = httplib2.Http(".cache") + url = URL + resp, content = h.request(url, "GET", body="", + headers={"Content-Type": "application/json"}) + self.assertEqual(200, int(resp['status'])) + self.assertEqual('application/json', resp['content-type']) + + def test_get_version_xml(self): + h = httplib2.Http(".cache") + url = URL + resp, content = h.request(url, "GET", body="", + headers={"Content-Type": "application/xml", + "ACCEPT": "application/xml"}) + self.assertEqual(200, int(resp['status'])) + self.assertEqual('application/xml', resp['content-type']) + + +class authorize_test(server_test): + + def setUp(self): + self.token = get_token('joeuser', 'secrete', 'token') + self.tenant = get_tenant() + self.user = get_user() + self.userdisabled = get_userdisabled() + self.auth_token = get_auth_token() + self.exp_auth_token = get_exp_auth_token() + self.disabled_token = get_disabled_token() + + + + def tearDown(self): + delete_token(self.token, self.auth_token) + + def test_a_authorize(self): + resp, content = get_token('joeuser', 'secrete') + self.assertEqual(200, int(resp['status'])) + self.assertEqual('application/json', resp['content-type']) + + def test_a_authorize_xml(self): + resp, content = get_token_xml('joeuser', 'secrete') + self.assertEqual(200, int(resp['status'])) + self.assertEqual('application/xml', resp['content-type']) + + def test_a_authorize_user_disaabled(self): + h = httplib2.Http(".cache") + url = '%stoken' % URL + body = {"passwordCredentials": {"username": "disabled", + "password": "self.tenant_group='test_tenant_group'secrete"}} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json"}) + content = json.loads(content) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + def test_a_authorize_user_disaabled_xml(self): + h = httplib2.Http(".cache") + url = '%stoken' % URL + + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <passwordCredentials \ + xmlns="http://docs.openstack.org/idm/api/v1.0" \ + password="secrete" username="disabled" \ + />' + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml", + "ACCEPT": "application/xml"}) + content = etree.fromstring(content) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + def test_a_authorize_user_wrong(self): + h = httplib2.Http(".cache") + url = '%stoken' % URL + body = {"passwordCredentials": {"username-w": "disabled", + "password": "secrete"}} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json"}) + content = json.loads(content) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(400, int(resp['status'])) + + def test_a_authorize_user_wrong_xml(self): + h = httplib2.Http(".cache") + url = '%stoken' % URL + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <passwordCredentials \ + xmlns="http://docs.openstack.org/idm/api/v1.0" \ + password="secrete" username-w="disabled" \ + />' + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml", + "ACCEPT": "application/xml"}) + content = etree.fromstring(content) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(400, int(resp['status'])) + + +class validate_token(authorize_test): + + def test_validate_token_true(self): + h = httplib2.Http(".cache") + + url = '%stoken/%s?belongsTo=%s' % (URL, self.token, self.tenant) + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/json", \ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + self.assertEqual('application/json', resp['content-type']) + + def test_validate_token_true_xml(self): + h = httplib2.Http(".cache") + url = '%stoken/%s?belongsTo=%s' % (URL, self.token, self.tenant) + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/xml", \ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + self.assertEqual('application/xml', resp['content-type']) + + def test_validate_token_expired(self): + h = httplib2.Http(".cache") + url = '%stoken/%s?belongsTo=%s' % (URL, self.exp_auth_token, \ + self.tenant) + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/json", \ + "X-Auth-Token": self.exp_auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + self.assertEqual('application/json', resp['content-type']) + + def test_validate_token_expired_xml(self): + h = httplib2.Http(".cache") + + url = '%stoken/%s?belongsTo=%s' % (URL, self.exp_auth_token, \ + self.tenant) + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/xml", \ + "X-Auth-Token": self.exp_auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + self.assertEqual('application/xml', resp['content-type']) + + def test_validate_token_invalid(self): + h = httplib2.Http(".cache") + url = '%stoken/%s?belongsTo=%s' % (URL, 'NonExistingToken', \ + self.tenant) + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/json", \ + "X-Auth-Token": self.auth_token}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + self.assertEqual('application/json', resp['content-type']) + + def test_validate_token_invalid_xml(self): + h = httplib2.Http(".cache") + url = '%stoken/%s?belongsTo=%s' % (URL, 'NonExistingToken', \ + self.tenant) + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/json", \ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + self.assertEqual('application/json', resp['content-type']) + + +class tenant_test(unittest.TestCase): + + def setUp(self): + self.token = get_token('joeuser', 'secrete', 'token') + self.tenant = get_tenant() + self.user = get_user() + self.userdisabled = get_userdisabled() + self.auth_token = get_auth_token() + self.exp_auth_token = get_exp_auth_token() + self.disabled_token = get_disabled_token() + + def tearDown(self): + resp, content = delete_tenant(self.tenant, self.auth_token) +""" "passwordCredentials" : {"username" : "joeuser","password": "secrete","tenantId": "1234"} +""" + +class create_tenant_test(tenant_test): + + def test_tenant_create(self): + resp, content = delete_tenant('test_tenant', str(self.auth_token)) + + resp, content = create_tenant('test_tenant', str(self.auth_token)) + self.tenant = 'test_tenant' + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + + if int(resp['status']) not in (200, 201): + + self.fail('Failed due to %d' % int(resp['status'])) + + def test_tenant_create_xml(self): + resp, content = delete_tenant_xml('test_tenant', str(self.auth_token)) + resp, content = create_tenant_xml('test_tenant', str(self.auth_token)) + self.tenant = 'test_tenant' + content = etree.fromstring(content) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + + if int(resp['status']) not in (200, 201): + + self.fail('Failed due to %d' % int(resp['status'])) + + def test_tenant_create_again(self): + + resp, content = create_tenant("test_tenant", str(self.auth_token)) + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(409, int(resp['status'])) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + def test_tenant_create_again_xml(self): + + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get("id") + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(409, int(resp['status'])) + if int(resp['status']) == 200: + self.tenant = content.get("id") + + def test_tenant_create_forbidden_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenants' % (URL) + body = {"tenant": {"id": self.tenant, + "description": "A description ...", + "enabled": True}} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json", + "X-Auth-Token": self.token}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + def test_tenant_create_forbidden_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get('id') + + url = '%stenants' % (URL) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \ + enabled="true" id="%s"> \ + <description>A description...</description> \ + </tenant>' % self.tenant + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.token, + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + def test_tenant_create_expired_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenants' % (URL) + body = {"tenant": {"id": self.tenant, + "description": "A description ...", + "enabled": True}} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json", + "X-Auth-Token": self.exp_auth_token}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + def test_tenant_create_expired_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get('id') + + url = '%stenants' % (URL) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \ + enabled="true" id="%s"> \ + <description>A description...</description> \ + </tenant>' % self.tenant + + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.exp_auth_token, + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + def test_tenant_create_missing_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenants' % (URL) + body = {"tenant": {"id": self.tenant, + "description": "A description ...", + "enabled": True}} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + def test_tenant_create_missing_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get('id') + + url = '%stenants' % (URL) + + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \ + enabled="true" id="%s"> \ + <description>A description...</description> \ + </tenant>' % self.tenant + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml", + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + def test_tenant_create_disabled_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenants' % (URL) + body = '{"tenant": { "id": "%s", \ + "description": "A description ...", "enabled"\ + :true } }' % self.tenant + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.disabled_token}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + def test_tenant_create_disabled_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get('id') + + url = '%stenants' % (URL) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \ + enabled="true" id="%s"> \ + <description>A description...</description> \ + </tenant>' % self.tenant + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml", + "X-Auth-Token": self.disabled_token, + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + def test_tenant_create_invalid_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenants' % (URL) + body = '{"tenant": { "id": "%s", \ + "description": "A description ...", "enabled"\ + :true } }' % self.tenant + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": 'nonexsitingtoken'}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + def test_tenant_create_invalid_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get('id') + + url = '%stenants' % (URL) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \ + enabled="true" id="%s"> \ + <description>A description...</description> \ + </tenant>' % self.tenant + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": 'nonexsitingtoken', + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + +class get_tenants_test(tenant_test): + + def test_get_tenants(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants' % (URL) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + + def test_get_tenants_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants' % (URL) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + + def test_get_tenants_forbidden_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants' % (URL) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + def test_get_tenants_forbidden_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants' % (URL) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + def test_get_tenants_exp_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants' % (URL) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.exp_auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + def test_get_tenants_exp_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants' % (URL) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.exp_auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + +class get_tenant_test(tenant_test): + + def test_get_tenant(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/%s' % (URL, self.tenant) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + + def test_get_tenant_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/%s' % (URL, self.tenant) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + + def test_get_tenant_bad(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/%s' % (URL, 'tenant_bad') + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + + def test_get_tenant_bad_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/%s' % (URL, 'tenant_bad') + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + + def test_get_tenant_not_found(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/NonexistingID' % (URL) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + + def test_get_tenant_not_found_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/NonexistingID' % (URL) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + + +class update_tenant_test(tenant_test): + + def test_update_tenant(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/%s' % (URL, self.tenant) + data = '{"tenant": { "description": "A NEW description..." ,\ + "enabled":true }}' + #test for Content-Type = application/json + resp, content = h.request(url, "PUT", body=data,\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + body = json.loads(content) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + self.assertEqual(int(self.tenant), int(body['tenant']['id'])) + self.assertEqual('A NEW description...', \ + body['tenant']['description']) + + def test_update_tenant_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml(self.tenant, str(self.auth_token)) + url = '%stenants/%s' % (URL, self.tenant) + data = '<?xml version="1.0" encoding="UTF-8"?> \ + <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \ + enabled="true"> \ + <description>A NEW description...</description> \ + </tenant>' + + #test for Content-Type = application/json + resp, content = h.request(url, "PUT", body=data,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + body = etree.fromstring(content) + desc = body.find("{http://docs.openstack.org/idm/api/v1.0}description") + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + self.assertEqual(int(self.tenant), int(body.get('id'))) + self.assertEqual('A NEW description...', \ + desc.text) + + def test_update_tenant_bad(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/%s' % (URL, self.tenant) + data = '{"tenant": { "description_bad": "A NEW description...",\ + "enabled":true }}' + #test for Content-Type = application/json + + resp, content = h.request(url, "PUT", body=data,\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(400, int(resp['status'])) + + def test_update_tenant_bad_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/%s' % (URL, self.tenant) + data = '<?xml version="1.0" encoding="UTF-8"?> \ + <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \ + enabled="true"> \ + <description_bad>A NEW description...</description> \ + </tenant>' + #test for Content-Type = application/json + resp, content = h.request(url, "PUT", body=data,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(400, int(resp['status'])) + + def test_update_tenant_not_found(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/NonexistingID' % (URL) + data = '{"tenant": { "description": "A NEW description...",\ + "enabled":true }}' + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body=data,\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + + def test_update_tenant_not_found_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/NonexistingID' % (URL) + data = '<?xml version="1.0" encoding="UTF-8"?> \ + <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \ + enabled="true"> \ + <description_bad>A NEW description...</description> \ + </tenant>' + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body=data,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + + +class delete_tenant_test(tenant_test): + + def test_delete_tenant_not_found(self): + #resp,content=create_tenant("test_tenant_delete", str(self.auth_token)) + resp, content = delete_tenant("test_tenant_delete111", \ + str(self.auth_token)) + self.assertEqual(404, int(resp['status'])) + + def test_delete_tenant_not_found_xml(self): + #resp,content=create_tenant("test_tenant_delete", str(self.auth_token)) + resp, content = delete_tenant_xml("test_tenant_delete111", \ + str(self.auth_token)) + self.assertEqual(404, int(resp['status'])) + + def test_delete_tenant(self): + resp, content = create_tenant("test_tenant_delete", \ + str(self.auth_token)) + resp, content = delete_tenant("test_tenant_delete", \ + str(self.auth_token)) + self.assertEqual(204, int(resp['status'])) + + def test_delete_tenant_xml(self): + resp, content = create_tenant_xml("test_tenant_delete", \ + str(self.auth_token)) + resp, content = delete_tenant_xml("test_tenant_delete", \ + str(self.auth_token)) + self.assertEqual(204, int(resp['status'])) + + +<<<<<<< HEAD + + +class tenant_group_test(unittest.TestCase): + + def setUp(self): + self.token = get_token('joeuser', 'secrete', 'token') + self.tenant = get_tenant() + self.user = get_user() + self.userdisabled = get_userdisabled() + self.auth_token = get_auth_token() + self.exp_auth_token = get_exp_auth_token() + self.disabled_token = get_disabled_token() + self.tenant_group = 'test_tenant_group' + + def tearDown(self): + resp, content = delete_tenant_group('test_tenant_group', \ + self.tenant, self.auth_token) + resp, content = delete_tenant(self.tenant, self.auth_token) + + +class create_tenant_group_test(tenant_group_test): + + def test_tenant_group_create(self): + resp, content = delete_tenant('test_tenant', str(self.auth_token)) + resp, content = create_tenant('test_tenant', str(self.auth_token)) + + respG, contentG = delete_tenant_group('test_tenant_group', \ + 'test_tenant', str(self.auth_token)) + respG, contentG = create_tenant_group('test_tenant_group', \ + 'test_tenant', str(self.auth_token)) + + self.tenant = 'test_tenant' + self.tenant_group = 'test_tenant_group' + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + + if int(respG['status']) not in (200, 201): + + self.fail('Failed due to %d' % int(respG['status'])) + + def test_tenant_group_create_xml(self): + resp, content = delete_tenant_xml('test_tenant', str(self.auth_token)) + resp, content = create_tenant_xml('test_tenant', str(self.auth_token)) + respG, contentG = delete_tenant_group_xml('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + respG, contentG = create_tenant_group_xml('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + + self.tenant = 'test_tenant' + self.tenant_group = 'test_tenant_group' + content = etree.fromstring(content) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + + if int(respG['status']) not in (200, 201): + + self.fail('Failed due to %d' % int(respG['status'])) + + def test_tenant_group_create_again(self): + + resp, content = create_tenant("test_tenant", str(self.auth_token)) + + respG, contentG = create_tenant_group('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + respG, contentG = create_tenant_group('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + + if int(respG['status']) == 200: + self.tenant = content['tenant']['id'] + self.tenant_group = contentG['group']['id'] + if int(respG['status']) == 500: + self.fail('IDM fault') + elif int(respG['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(409, int(respG['status'])) + if int(respG['status']) == 200: + self.tenant = content['tenant']['id'] + self.tenant_group = contentG['group']['id'] + + def test_tenant_group_create_again_xml(self): + + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + + respG, contentG = create_tenant_group_xml('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + respG, contentG = create_tenant_group_xml('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + + content = etree.fromstring(content) + contentG = etree.fromstring(contentG) + if int(respG['status']) == 200: + self.tenant = content.get("id") + self.tenant_group = contentG.get("id") + + if int(respG['status']) == 500: + self.fail('IDM fault') + elif int(respG['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(409, int(respG['status'])) + if int(respG['status']) == 200: + self.tenant = content.get("id") + self.tenant_group = contentG.get("id") + + def test_tenant_group_create_forbidden_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + respG, contentG = create_tenant_group_xml('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + if int(respG['status']) == 200: + self.tenant_group = respG['group']['id'] + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = {"group": {"id": self.tenant_group, + "description": "A description ..." + }} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json", + "X-Auth-Token": self.token}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + def test_tenant_group_create_forbidden_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="%s"> \ + <description>A description...</description> \ + </group>' % self.tenant_group + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml", \ + "X-Auth-Token": self.token, + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + + self.assertEqual(403, int(resp['status'])) + + def test_tenant_group_create_expired_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = {"group": {"id": self.tenant_group, + "description": "A description ..." + }} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json", + "X-Auth-Token": self.exp_auth_token}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + def test_tenant_group_create_expired_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get('id') + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="%s"> \ + <description>A description...</description> \ + </group>' % self.tenant + + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml", \ + "X-Auth-Token": self.exp_auth_token, + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + def test_tenant_group_create_missing_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = {"group": {"id": self.tenant_group, + "description": "A description ..."}} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + def test_tenant_group_create_missing_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get('id') + + url = '%stenant/%s/groups' % (URL, self.tenant) + + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="%s"> \ + <description>A description...</description> \ + </group>' % self.tenant_group + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml", + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + def test_tenant_group_create_disabled_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '{"group": { "id": "%s", \ + "description": "A description ..." } }' % self.tenant_group + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.disabled_token}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + def test_tenant_group_create_disabled_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get('id') + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="%s"> \ + <description>A description...</description> \ + </group>' % self.tenant_group + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml", + "X-Auth-Token": self.disabled_token, + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + def test_tenant_group_create_invalid_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '{"group": { "id": "%s", \ + "description": "A description ..." } }' % self.tenant + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": 'nonexsitingtoken'}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + def test_tenant_group_create_invalid_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get('id') + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="%s"> \ + <description>A description...</description> \ + </group>' % self.tenant_group + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": 'nonexsitingtoken', + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + +class get_tenant_groups_test(tenant_group_test): + + def test_get_tenant_groups(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + + url = '%stenant/%s/groups' % (URL,self.tenant) + + resp, content = h.request(url, "GET", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + + def test_get_tenant_groups_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + + respG, contentG = create_tenant_group_xml(self.tenant_group,\ + self.tenant, str(self.auth_token)) + + url = '%stenant/%s/groups' % (URL,self.tenant) + + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + + def test_get_tenant_groups_forbidden_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups' % (URL,self.tenant) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + def test_get_tenant_groups_forbidden_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups' % (URL,self.tenant) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + def test_get_tenant_groups_exp_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups' % (URL,self.tenant) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.exp_auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + def test_get_tenant_groups_exp_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups' % (URL,self.tenant) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.exp_auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + +class get_tenant_group_test(tenant_group_test): + + def test_get_tenant_group(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + + def test_get_tenant_group_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group_xml(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + + def test_get_tenant_group_bad(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/%s' % (URL,'tenant_bad',self.tenant_group) + + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + + def test_get_tenant_group_bad_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/%s' % (URL,'tenant_bad',self.tenant_group) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + + def test_get_tenant_group_not_found(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/%s' % (URL,self.tenant,'nonexistinggroup') + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + + def test_get_tenant_group_not_found_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/%s' % (URL,self.tenant,'nonexistinggroup') + + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + + +class update_tenant_group_test(tenant_group_test): + + def test_update_tenant_group(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group) + + data = '{"group": { "id":"%s","description": "A NEW description..." ,\ + "tenantId":"%s" }}' % (self.tenant_group,self.tenant) + #test for Content-Type = application/json + resp, content = h.request(url, "PUT", body=data,\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + body = json.loads(content) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + self.assertEqual(self.tenant_group, body['group']['id']) + self.assertEqual('A NEW description...', \ + body['group']['description']) + + def test_update_tenant_group_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/%s' % (URL, self.tenant ,self.tenant_group) + data = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + tenantId="%s" id="%s"> \ + <description>A NEW description...</description> \ + </group>' % (self.tenant, self.tenant_group) + + #test for Content-Type = application/json + resp, content = h.request(url, "PUT", body=data,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + + body = etree.fromstring(content) + desc = body.find("{http://docs.openstack.org/idm/api/v1.0}description") + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + self.assertEqual(str(self.tenant_group), str(body.get('id'))) + self.assertEqual('A NEW description...', \ + desc.text) + + def test_update_tenant_group_bad(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group) + data = '{"group": { "description_bad": "A NEW description...",\ + "id":"%s","tenantId":"%s" }}' % (self.tenant_group,self.tenant) + #test for Content-Type = application/json + + resp, content = h.request(url, "PUT", body=data,\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(400, int(resp['status'])) + + def test_update_tenant_group_bad_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group) + data = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + tenantId="%s" id="%s"> \ + <description_bad>A NEW description...</description> \ + </group>' % (self.tenant, self.tenant_group) + #test for Content-Type = application/json + resp, content = h.request(url, "PUT", body=data,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(400, int(resp['status'])) + + def test_update_tenant_group_not_found(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/NonexistingID' % (URL, self.tenant) + + data = '{"group": { "description": "A NEW description...",\ + "id":"NonexistingID", "tenantId"="test_tenant" }}' + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body=data,\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + + def test_update_tenant_group_not_found_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/NonexistingID' % (URL, self.tenant) + data = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="NonexistingID", "tenant_id"="test_tenant"> \ + <description_bad>A NEW description...</description> \ + </group>' + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body=data,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + + +class delete_tenant_group_test(tenant_test): + + def test_delete_tenant_group_not_found(self): + #resp,content=create_tenant("test_tenant_delete", str(self.auth_token)) + resp, content = delete_tenant_group("test_tenant_delete111", \ + "test_tenant", str(self.auth_token)) + self.assertEqual(404, int(resp['status'])) + + def test_delete_tenant_group_not_found_xml(self): + #resp,content=create_tenant("test_tenant_delete", str(self.auth_token)) + resp, content = delete_tenant_group_xml("test_tenant_delete111", \ + "test_tenant", str(self.auth_token)) + self.assertEqual(404, int(resp['status'])) + + def test_delete_tenant_group(self): + resp, content = create_tenant("test_tenant_delete", \ + str(self.auth_token)) + respG, contentG = create_tenant_group('test_tenant_group_delete', \ + "test_tenant_delete", str(self.auth_token)) + respG, contentG = delete_tenant_group('test_tenant_group_delete', \ + "test_tenant_delete", str(self.auth_token)) + resp, content = delete_tenant("test_tenant_delete", \ + str(self.auth_token)) + self.assertEqual(204, int(respG['status'])) + + def test_delete_tenant_group_xml(self): + resp, content = create_tenant_xml("test_tenant_delete", \ + str(self.auth_token)) + respG, contentG = create_tenant_group_xml('test_tenant_group_delete', \ + "test_tenant_delete", str(self.auth_token)) + respG, contentG = delete_tenant_group_xml('test_tenant_group_delete', \ + "test_tenant_delete", str(self.auth_token)) + resp, content = delete_tenant_xml("test_tenant_delete", \ + str(self.auth_token)) + self.assertEqual(204, int(respG['status'])) + +class create_global_group_test(global_group_test): + + def test_global_group_create(self): + + respG, contentG = delete_global_group('test_tenant_group', \ + str(self.auth_token)) + respG, contentG = create_global_group(str(self.auth_token)) + self.group = 'test_tenant_group' + + if int(respG['status']) == 500: + self.fail('IDM fault') + elif int(respG['status']) == 503: + self.fail('Service Not Available') + if int(respG['status']) not in (200, 201): + self.fail('Failed due to %d' % int(respG['status'])) + + + def test_global_group_create_again(self): + + respG, contentG = create_global_group('test_tenant_group', \ + str(self.auth_token)) + respG, contentG = create_global_group('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + + if int(respG['status']) == 200: + self.tenant = content['tenant']['id'] + self.tenant_group = contentG['group']['id'] + if int(respG['status']) == 500: + self.fail('IDM fault') + elif int(respG['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(409, int(respG['status'])) + if int(respG['status']) == 200: + self.tenant = content['tenant']['id'] + self.tenant_group = contentG['group']['id'] + + + def test_tenant_group_create_forbidden_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + respG, contentG = create_tenant_group_xml('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + if int(respG['status']) == 200: + self.tenant_group = respG['group']['id'] + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = {"group": {"id": self.tenant_group, + "description": "A description ..." + }} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json", + "X-Auth-Token": self.token}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + + def test_tenant_group_create_expired_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = {"group": {"id": self.tenant_group, + "description": "A description ..." + }} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json", + "X-Auth-Token": self.exp_auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + def test_tenant_group_create_missing_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = {"group": {"id": self.tenant_group, + "description": "A description ..."}} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + def test_tenant_group_create_disabled_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '{"group": { "id": "%s", \ + "description": "A description ..." } }' % self.tenant_group + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.disabled_token}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + def test_tenant_group_create_invalid_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '{"group": { "id": "%s", \ + "description": "A description ..." } }' % self.tenant + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": 'nonexsitingtoken'}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + + + def test_tenant_group_create_xml(self): + resp, content = delete_tenant_xml('test_tenant', str(self.auth_token)) + resp, content = create_tenant_xml('test_tenant', str(self.auth_token)) + respG, contentG = delete_tenant_group_xml('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + respG, contentG = create_tenant_group_xml('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + + self.tenant = 'test_tenant' + self.tenant_group = 'test_tenant_group' + content = etree.fromstring(content) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + + if int(respG['status']) not in (200, 201): + + self.fail('Failed due to %d' % int(respG['status'])) + + def test_tenant_group_create_again_xml(self): + + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + + respG, contentG = create_tenant_group_xml('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + respG, contentG = create_tenant_group_xml('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + + content = etree.fromstring(content) + contentG = etree.fromstring(contentG) + if int(respG['status']) == 200: + self.tenant = content.get("id") + self.tenant_group = contentG.get("id") + + if int(respG['status']) == 500: + self.fail('IDM fault') + elif int(respG['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(409, int(respG['status'])) + if int(respG['status']) == 200: + self.tenant = content.get("id") + self.tenant_group = contentG.get("id") + + def test_tenant_group_create_forbidden_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="%s"> \ + <description>A description...</description> \ + </group>' % self.tenant_group + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml", \ + "X-Auth-Token": self.token, + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + + self.assertEqual(403, int(resp['status'])) + + def test_tenant_group_create_expired_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get('id') + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="%s"> \ + <description>A description...</description> \ + </group>' % self.tenant + + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml", \ + "X-Auth-Token": self.exp_auth_token, + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + def test_tenant_group_create_missing_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get('id') + + url = '%stenant/%s/groups' % (URL, self.tenant) + + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="%s"> \ + <description>A description...</description> \ + </group>' % self.tenant_group + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml", + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + def test_tenant_group_create_disabled_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get('id') + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="%s"> \ + <description>A description...</description> \ + </group>' % self.tenant_group + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml", + "X-Auth-Token": self.disabled_token, + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + def test_tenant_group_create_invalid_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get('id') + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="%s"> \ + <description>A description...</description> \ + </group>' % self.tenant_group + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": 'nonexsitingtoken', + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + +def setup(): + pass + + +def teardown(): + pass + + +if __name__ == '__main__': + setup() + try: + unittest.main() + finally: + teardown() |
