summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSai Krishna <saikrishna1511@gmail.com>2011-05-05 19:10:18 +0530
committerSai Krishna <saikrishna1511@gmail.com>2011-05-05 19:10:18 +0530
commit8716d49c66fe50ab2e64ed97079c0a47bb8ea559 (patch)
tree57089569b7d5c521a6dd469e9545d3345b773af2
parent9516c4e0f446244cba4c3f5a9eba0abca45a822f (diff)
parenta0452fe1a376550bddd18987bd6d0d902eb649b4 (diff)
downloadkeystone-8716d49c66fe50ab2e64ed97079c0a47bb8ea559.tar.gz
keystone-8716d49c66fe50ab2e64ed97079c0a47bb8ea559.tar.xz
keystone-8716d49c66fe50ab2e64ed97079c0a47bb8ea559.zip
Merge branch 'master' of https://github.com/khussein/keystone
Conflicts: test/unit/test_identity.py
-rw-r--r--HACKING29
-rw-r--r--README.md30
-rwxr-xr-xbin/keystoned26
-rw-r--r--docs/guide/src/docbkx/xsd/atom/atom.xsd10
-rw-r--r--keystone/auth_protocols/auth_basic.py2
-rw-r--r--keystone/auth_protocols/auth_openid.py2
-rw-r--r--keystone/auth_protocols/auth_token.py10
-rw-r--r--keystone/db/sqlalchemy/api.py32
-rw-r--r--keystone/keystone.dbbin0 -> 15360 bytes
-rw-r--r--keystone/logic/service.py92
-rw-r--r--keystone/logic/types/fault.py4
-rw-r--r--keystone/logic/types/tenant.py80
-rw-r--r--keystone/server.py (renamed from keystone/identity.py)13
-rw-r--r--pip-requires1
-rw-r--r--test/unit/test_identity.py4212
15 files changed, 2295 insertions, 2248 deletions
diff --git a/HACKING b/HACKING
index e58d60e5..4841e6d9 100644
--- a/HACKING
+++ b/HACKING
@@ -1,5 +1,5 @@
-Nova Style Commandments
-=======================
+Keystone Style Commandments (pilfered from Nova and added to)
+=============================================================
Step 1: Read http://www.python.org/dev/peps/pep-0008/
Step 2: Read http://www.python.org/dev/peps/pep-0008/ again
@@ -16,7 +16,7 @@ Imports
# vim: tabstop=4 shiftwidth=4 softtabstop=4
{{stdlib imports in human alphabetical order}}
\n
- {{nova imports in human alphabetical order}}
+ {{OpenStack/Keystone imports in human alphabetical order}}
\n
\n
{{begin your code}}
@@ -27,8 +27,9 @@ General
- thou shalt put two newlines twixt toplevel code (funcs, classes, etc)
- thou shalt put one newline twixt methods in classes and anywhere else
- thou shalt not write "except:", use "except Exception:" at the very least
-- thou shalt include your name with TODOs as in "TODO(termie)"
+- thou shalt include your name with TODOs as in "TODO(waldo)"
- thou shalt not name anything the same name as a builtin or reserved word
+- thou shouldeth comment profusely
- thou shalt not violate causality in our time cone, or else
@@ -42,14 +43,12 @@ Human Alphabetical Order Examples
import time
import unittest
- from nova import flags
- from nova import test
- from nova.auth import users
- from nova.endpoint import api
- from nova.endpoint import cloud
+ import keystone.logic.types.fault as fault
+ import keystone.db.sqlalchemy.api as db_api
Docstrings
----------
+Add them to modules, classes, and functions:
"""Summary of the function, class or method, less than 80 characters.
New paragraph after newline that explains in more detail any general
@@ -66,3 +65,15 @@ Docstrings
:returns: description of the return value
"""
+
+Done/Done Criteria
+------------------
+How we define our code is done and ready for release:
+1. PEP-8 compliance
+2. pylint (same rules as Nova)
+3. McCabe 10 or less
+4. 65.258% test coverage
+5. All functional and unit tests pass
+6. Q/A Approval (if applicable - it is for Rackspace Integration dev teams)
+7. No sev A bugs (this shoud have been #1)
+
diff --git a/README.md b/README.md
index fa925ac4..20b28557 100644
--- a/README.md
+++ b/README.md
@@ -41,27 +41,15 @@ SETUP:
------
Install http://pypi.python.org/pypi/setuptools
-
- sudo easy_install bottle
- sudo easy_install eventlet
- sudo easy_install lxml
- sudo easy_install paste
- sudo easy_install pastedeploy
- sudo easy_install pastescript
- sudo easy_install pysqlite
- sudo easy_install sqlalchemy
- sudo easy_install webob
-
-Or using pip:
-
+ sudo easy_install pip
sudo pip install -r pip-requires
RUNNING KEYSTONE:
-----------------
- $ cd keystone
- $ python identity.py
+ $ cd bin
+ $ ./keystoned
RUNNING TEST SERVICE:
@@ -140,7 +128,7 @@ Unit Test on Identity Services
------------------------------
In order to run the unit test on identity services, run from the keystone directory
- python identity.py
+ python server.py
Once the Identity service is running, go to unit test/unit directory
@@ -151,13 +139,3 @@ For more on unit testing please refer
python test_identity --help
-
-DATABASE SCHEMA
----------------
-
- CREATE TABLE groups(group_id varchar(255),group_desc varchar(255),tenant_id varchar(255),FOREIGN KEY(tenant_id) REFERENCES tenant(tenant_id));
- CREATE TABLE tenants(tenant_id varchar(255), tenant_desc varchar(255), tenant_enabled INTEGER, PRIMARY KEY(tenant_id ASC));
- CREATE TABLE token(token_id varchar(255),user_id varchar(255),expires datetime,tenant_id varchar(255));
- CREATE TABLE user_group(user_id varchar(255),group_id varchar(255), FOREIGN KEY(user_id) REFERENCES user(id), FOREIGN KEY(group_id) REFERENCES groups(group_id));
- CREATE TABLE user_tenant(tenant_id varchar(255),user_id varchar(255),FOREIGN KEY(tenant_id) REFERENCES tenant(tenant_id),FOREIGN KEY(user_id) REFERENCES user(id));
- CREATE TABLE users(id varchar(255),password varchar(255),email varchar(255),enabled integer);
diff --git a/bin/keystoned b/bin/keystoned
new file mode 100755
index 00000000..f336ca1d
--- /dev/null
+++ b/bin/keystoned
@@ -0,0 +1,26 @@
+#!/bin/sh
+# Copyright (C) 2011 OpenStack LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# If ../keystone/__init__.py exists, add ../ to the Python search path so
+# that it will override whatever may be installed in the default Python
+# search path.
+script_dir=`dirname $0`
+if [ -f "$script_dir/../keystone/__init__.py" ]
+then
+ PYTHONPATH="$script_dir/..:$PYTHONPATH"
+ export PYTHONPATH
+fi
+
+/usr/bin/env python -m keystone.server $*
diff --git a/docs/guide/src/docbkx/xsd/atom/atom.xsd b/docs/guide/src/docbkx/xsd/atom/atom.xsd
index a619efaa..c515c497 100644
--- a/docs/guide/src/docbkx/xsd/atom/atom.xsd
+++ b/docs/guide/src/docbkx/xsd/atom/atom.xsd
@@ -72,7 +72,7 @@
<xs:attribute name="rel" use="required" type="atom:relation">
<xs:annotation>
<xs:documentation>
- <html:p>TODO</html:p>
+ <html:p>TODO(Jorge)</html:p>
</xs:documentation>
</xs:annotation>
</xs:attribute>
@@ -80,7 +80,7 @@
<xs:attribute name="type" use="optional" type="xs:string">
<xs:annotation>
<xs:documentation>
- <html:p>TODO</html:p>
+ <html:p>TODO(Jorge)</html:p>
</xs:documentation>
</xs:annotation>
</xs:attribute>
@@ -88,7 +88,7 @@
<xs:attribute name="href" use="required" type="xs:anyURI">
<xs:annotation>
<xs:documentation>
- <html:p>TODO</html:p>
+ <html:p>TODO(Jorge)</html:p>
</xs:documentation>
</xs:annotation>
</xs:attribute>
@@ -96,7 +96,7 @@
<xs:attribute name="hreflang" use="optional" type="xs:NMTOKEN">
<xs:annotation>
<xs:documentation>
- <html:p>TODO</html:p>
+ <html:p>TODO(Jorge)</html:p>
</xs:documentation>
</xs:annotation>
</xs:attribute>
@@ -104,7 +104,7 @@
<xs:attribute name="title" use="optional" type="xs:string">
<xs:annotation>
<xs:documentation>
- <html:p>TODO</html:p>
+ <html:p>TODO(Jorge)</html:p>
</xs:documentation>
</xs:annotation>
</xs:attribute>
diff --git a/keystone/auth_protocols/auth_basic.py b/keystone/auth_protocols/auth_basic.py
index 046ca08e..17f2261a 100644
--- a/keystone/auth_protocols/auth_basic.py
+++ b/keystone/auth_protocols/auth_basic.py
@@ -142,7 +142,7 @@ class AuthProtocol(object):
ssl=(self.service_protocol == 'https'))
resp = conn.getresponse()
data = resp.read()
- #TODO: use a more sophisticated proxy
+ #TODO(ziad): use a more sophisticated proxy
# we are rewriting the headers now
return Response(status=resp.status, body=data)(env, start_response)
diff --git a/keystone/auth_protocols/auth_openid.py b/keystone/auth_protocols/auth_openid.py
index ac9121f7..d68741df 100644
--- a/keystone/auth_protocols/auth_openid.py
+++ b/keystone/auth_protocols/auth_openid.py
@@ -85,7 +85,7 @@ class AuthProtocol(object):
ssl=(self.service_protocol == 'https'))
resp = conn.getresponse()
data = resp.read()
- #TODO: use a more sophisticated proxy
+ #TODO(ziad): use a more sophisticated proxy
# we are rewriting the headers now
return Response(status=resp.status, body=data)(env, start_response)
diff --git a/keystone/auth_protocols/auth_token.py b/keystone/auth_protocols/auth_token.py
index fbc6c622..4c5c5e9c 100644
--- a/keystone/auth_protocols/auth_token.py
+++ b/keystone/auth_protocols/auth_token.py
@@ -113,7 +113,7 @@ class AuthProtocol(object):
def __init__(self, app, conf):
""" Common initialization code """
- #TODO: maybe we rafactor this into a superclass
+ #TODO(ziad): maybe we rafactor this into a superclass
self._init_protocol_common(app, conf) # Applies to all protocols
self._init_protocol(app, conf) # Specific to this protocol
@@ -209,7 +209,7 @@ class AuthProtocol(object):
# Step 1: We need to auth with the keystone service, so get an
# admin token
- #TODO: Need to properly implement this, where to store creds
+ #TODO(ziad): Need to properly implement this, where to store creds
# for now using token from ini
#auth = self.get_admin_auth_token("admin", "secrete", "1")
#admin_token = json.loads(auth)["auth"]["token"]["id"]
@@ -220,7 +220,7 @@ class AuthProtocol(object):
headers = {"Content-type": "application/json",
"Accept": "text/json",
"X-Auth-Token": self.admin_token}
- ##TODO:we need to figure out how to auth to keystone
+ ##TODO(ziad):we need to figure out how to auth to keystone
#since validate_token is a priviledged call
#Khaled's version uses creds to get a token
# "X-Auth-Token": admin_token}
@@ -246,7 +246,7 @@ class AuthProtocol(object):
headers = {"Content-type": "application/json",
"Accept": "text/json",
"X-Auth-Token": self.admin_token}
- ##TODO:we need to figure out how to auth to keystone
+ ##TODO(ziad):we need to figure out how to auth to keystone
#since validate_token is a priviledged call
#Khaled's version uses creds to get a token
# "X-Auth-Token": admin_token}
@@ -294,7 +294,7 @@ class AuthProtocol(object):
ssl=(self.service_protocol == 'https'))
resp = conn.getresponse()
data = resp.read()
- #TODO: use a more sophisticated proxy
+ #TODO(ziad): use a more sophisticated proxy
# we are rewriting the headers now
return Response(status=resp.status, body=data)(self.proxy_headers,
self.start_response)
diff --git a/keystone/db/sqlalchemy/api.py b/keystone/db/sqlalchemy/api.py
index 47768c17..7ca888f4 100644
--- a/keystone/db/sqlalchemy/api.py
+++ b/keystone/db/sqlalchemy/api.py
@@ -44,7 +44,7 @@ def tenant_get_all(session=None):
def tenant_get_page(marker,limit,session=None):
if not session:
session = get_session()
-
+
if marker:
return session.query(models.Tenant).filter("id>:marker").params(\
marker = '%s' % marker).order_by\
@@ -53,8 +53,8 @@ def tenant_get_page(marker,limit,session=None):
return session.query(models.Tenant).order_by(\
models.Tenant.id.desc()).limit(limit).all()
#return session.query(models.Tenant).all()
-
-
+
+
def tenant_get_page_markers(marker,limit,session=None):
if not session:
session = get_session()
@@ -120,7 +120,7 @@ def tenant_group_is_empty( id, session=None):
group_id=id).first()
if a_user != None:
return False
-
+
return True
def tenant_delete(id, session=None):
@@ -142,13 +142,13 @@ def tenant_group_get(id, tenant, session=None):
if not session:
session = get_session()
result = session.query(models.Group).filter_by(id=id, tenant_id=tenant).first()
-
+
return result
def tenant_group_get_page(tenantId, marker,limit,session=None):
if not session:
session = get_session()
-
+
if marker:
return session.query(models.Group).filter("id>:marker").params(\
marker = '%s' % marker).filter_by(\
@@ -158,8 +158,8 @@ def tenant_group_get_page(tenantId, marker,limit,session=None):
return session.query(models.Group).filter_by(tenant_id=tenantId)\
.order_by(models.Group.id.desc()).limit(limit).all()
#return session.query(models.Tenant).all()
-
-
+
+
def tenant_group_get_page_markers(tenantId, marker,limit,session=None):
if not session:
session = get_session()
@@ -279,7 +279,7 @@ def group_users(id, session=None):
def users_tenant_group_get_page(group_id, marker,limit,session=None):
if not session:
session = get_session()
-
+
if marker:
return session.query(models.Users).filter_by(\
group_id=group_id).filter("id>:marker").params(\
@@ -289,9 +289,9 @@ def users_tenant_group_get_page(group_id, marker,limit,session=None):
return session.query(models.Users).filter_by(\
group_id=group_id).order_by(\
models.Users.id.desc()).limit(limit).all()
-
-
-
+
+
+
def users_tenant_group_get_page_markers(group_id, marker,limit,session=None):
if not session:
session = get_session()
@@ -339,7 +339,7 @@ def group_get_all(session=None):
def group_get_page(marker,limit,session=None):
if not session:
session = get_session()
-
+
if marker:
return session.query(models.Group).filter("id>:marker").params(\
marker = '%s' % marker).order_by\
@@ -347,9 +347,9 @@ def group_get_page(marker,limit,session=None):
else:
return session.query(models.Group).order_by(\
models.Group.id.desc()).limit(limit).all()
-
-
-
+
+
+
def group_get_page_markers(marker,limit,session=None):
if not session:
session = get_session()
diff --git a/keystone/keystone.db b/keystone/keystone.db
new file mode 100644
index 00000000..16fcf4d7
--- /dev/null
+++ b/keystone/keystone.db
Binary files differ
diff --git a/keystone/logic/service.py b/keystone/logic/service.py
index ef76867b..40b865b8 100644
--- a/keystone/logic/service.py
+++ b/keystone/logic/service.py
@@ -47,7 +47,7 @@ class IDMService(object):
#
# Look for an existing token, or create one,
- # TODO: Handle tenant/token search
+ # TODO(Jorge): Handle tenant/token search
#
dtoken = db_api.token_for_user(duser.id)
if not dtoken or dtoken.expires < datetime.now():
@@ -124,12 +124,12 @@ class IDMService(object):
# dtenant.desc, dtenant.enabled))
# return tenants.Tenants(ts, [])
-
-
+
+
##
## GET Tenants with Pagination
##
-
+
def get_tenants(self, admin_token, marker, limit, url):
self.__validate_token(admin_token)
@@ -142,13 +142,13 @@ class IDMService(object):
links=[]
if prev:
links.append(atom.Link('prev',"%s?'marker=%s&limit=%s'" % (url,prev,limit)))
- if next:
+ if next:
links.append(atom.Link('next',"%s?'marker=%s&limit=%s'" % (url,next,limit)))
-
-
+
+
return tenants.Tenants(ts, links)
-
+
def get_tenant(self, admin_token, tenant_id):
self.__validate_token(admin_token)
@@ -188,11 +188,11 @@ class IDMService(object):
db_api.tenant_delete(dtenant.id)
return None
-
+
#
# Tenant Group Operations
#
-
+
def create_tenant_group(self, admin_token, tenant, group):
self.__validate_token(admin_token)
@@ -201,7 +201,7 @@ class IDMService(object):
if tenant == None:
raise fault.BadRequestFault("Expecting a Tenant Id")
-
+
dtenant = db_api.tenant_get(tenant)
if dtenant == None:
raise fault.ItemNotFoundFault("The tenant not found")
@@ -209,7 +209,7 @@ class IDMService(object):
if group.group_id == None:
raise fault.BadRequestFault("Expecting a Group Id")
-
+
if db_api.group_get(group.group_id) != None:
raise fault.TenantGroupConflictFault(
"A tenant group with that id already exists")
@@ -223,20 +223,20 @@ class IDMService(object):
return tenants.Group(dtenant.id, dtenant.desc, dtenant.tenant_id)
-
-
+
+
def get_tenant_groups(self, admin_token, tenantId, marker, limit, url):
self.__validate_token(admin_token)
if tenantId == None:
raise fault.BadRequestFault("Expecting a Tenant Id")
-
+
dtenant = db_api.tenant_get(tenantId)
if dtenant == None:
raise fault.ItemNotFoundFault("The tenant not found")
-
+
ts = []
dtenantgroups = db_api.tenant_group_get_page(tenantId, marker,limit)
-
+
for dtenantgroup in dtenantgroups:
ts.append(tenants.Group(dtenantgroup.id,
dtenantgroup.desc, dtenantgroup.tenant_id))
@@ -244,27 +244,27 @@ class IDMService(object):
links=[]
if prev:
links.append(atom.Link('prev',"%s?'marker=%s&limit=%s'" % (url,prev,limit)))
- if next:
+ if next:
links.append(atom.Link('next',"%s?'marker=%s&limit=%s'" % (url,next,limit)))
-
-
+
+
return tenants.Groups(ts, links)
-
+
def get_tenant_group(self, admin_token, tenant_id, group_id):
self.__validate_token(admin_token)
-
+
dtenant = db_api.tenant_get(tenant_id)
if dtenant == None:
raise fault.ItemNotFoundFault("The tenant not found")
-
+
dtenant = db_api.tenant_group_get(group_id, tenant_id)
if not dtenant:
raise fault.ItemNotFoundFault("The tenant group not found")
-
-
+
+
return tenants.Group(dtenant.id, dtenant.desc, dtenant.tenant_id)
-
-
+
+
def update_tenant_group(self, admin_token, tenant_id, group_id, group):
self.__validate_token(admin_token)
@@ -275,32 +275,32 @@ class IDMService(object):
dtenant = db_api.tenant_get(tenant_id)
if dtenant == None:
raise fault.ItemNotFoundFault("The tenant not found")
-
+
dtenant = db_api.tenant_group_get(group_id, tenant_id)
if not dtenant:
raise fault.ItemNotFoundFault("The tenant group not found")
-
+
if group_id != group.group_id:
raise fault.BadRequestFault("Wrong Data Provided,Group id not matching")
-
+
if str(tenant_id) != str(group.tenant_id):
- raise fault.BadRequestFault("Wrong Data Provided, Tenant id not matching ")
-
+ raise fault.BadRequestFault("Wrong Data Provided, Tenant id not matching ")
+
values = {'desc': group.description}
db_api.tenant_group_update(group_id, tenant_id, values)
return tenants.Group(group_id, group.description, tenant_id)
-
+
def delete_tenant_group(self, admin_token, tenant_id, group_id):
self.__validate_token(admin_token)
dtenant = db_api.tenant_get(tenant_id)
-
+
if dtenant == None:
raise fault.ItemNotFoundFault("The tenant not found")
-
+
dtenant = db_api.tenant_group_get(group_id, tenant_id)
if not dtenant:
raise fault.ItemNotFoundFault("The tenant group not found")
@@ -311,22 +311,22 @@ class IDMService(object):
db_api.tenant_group_delete(group_id, tenant_id)
return None
-
-
+
+
def get_users_tenant_group(self, admin_token, tenantId, groupId, marker, limit, url):
self.__validate_token(admin_token)
if tenantId == None:
raise fault.BadRequestFault("Expecting a Tenant Id")
-
+
if db_api.tenant_get(tenantId) == None:
raise fault.ItemNotFoundFault("The tenant not found")
-
+
if db_api.tenant_group_get(groupId, tenantId) == None:
raise fault.ItemNotFoundFault(
"A tenant group with that id not found")
-
+
ts = []
-
+
dgroupusers = db_api.users_tenant_group_get_page( groupId, marker,limit)
for dgroupuser in dgroupusers:
ts.append(tenants.User(dgroupuser.id,
@@ -335,13 +335,13 @@ class IDMService(object):
links=[]
if prev:
links.append(atom.Link('prev',"%s?'marker=%s&limit=%s'" % (url,prev,limit)))
- if next:
+ if next:
links.append(atom.Link('next',"%s?'marker=%s&limit=%s'" % (url,next,limit)))
-
-
+
+
return tenants.Users(ts, links)
-
-
+
+
#
# Private Operations
#
diff --git a/keystone/logic/types/fault.py b/keystone/logic/types/fault.py
index fd2e85cc..aa920184 100644
--- a/keystone/logic/types/fault.py
+++ b/keystone/logic/types/fault.py
@@ -116,8 +116,8 @@ class TenantGroupConflictFault(IDMFault):
def __init__(self, msg, details=None, code=409):
super(TenantGroupConflictFault, self).__init__(msg, details, code)
self.key = "tenantGroupConflict"
-
-
+
+
class OverlimitFault(IDMFault):
"A limit has been exceeded"
diff --git a/keystone/logic/types/tenant.py b/keystone/logic/types/tenant.py
index 5e933b9b..c8759628 100644
--- a/keystone/logic/types/tenant.py
+++ b/keystone/logic/types/tenant.py
@@ -109,14 +109,14 @@ class Tenants(object):
def to_xml(self):
dom = etree.Element("tenants")
- dom.set(u"xmlns","http://docs.openstack.org/idm/api/v1.0")
-
+ dom.set(u"xmlns","http://docs.openstack.org/idm/api/v1.0")
+
for t in self.values:
dom.append(t.to_dom())
-
+
for t in self.links:
dom.append(t.to_dom())
-
+
return etree.tostring(dom)
def to_json(self):
@@ -130,7 +130,7 @@ class Group(object):
"Describes a group in the auth system"
def __init__(self, group_id, description, tenant_id=''):
-
+
self.description = description
self.group_id = group_id
self.tenant_id = tenant_id
@@ -145,8 +145,8 @@ class Group(object):
raise fault.BadRequestFault("Expecting Group")
group_id = root.get("id")
tenant_id = root.get("tenantId")
-
-
+
+
desc = root.find("{http://docs.openstack.org/idm/api/v1.0}"
"description")
if desc == None:
@@ -159,21 +159,21 @@ class Group(object):
def from_json(json_str):
try:
obj = json.loads(json_str)
-
+
if not "group" in obj:
raise fault.BadRequestFault("Expecting group")
group = obj["group"]
-
+
if not "id" in group:
group_id = None
else:
group_id = group["id"]
-
+
if not "tenantId" in group:
tenantId = None
else:
tenantId = group["tenantId"]
-
+
if not "description" in group:
raise fault.BadRequestFault("Expecting Group Description")
description = group["description"]
@@ -186,13 +186,13 @@ class Group(object):
xmlns="http://docs.openstack.org/idm/api/v1.0")
if self.group_id:
dom.set("id", self.group_id)
-
+
if self.tenant_id:
dom.set("tenantId", self.tenant_id)
-
-
+
+
desc = etree.Element("description")
-
+
desc.text = self.description
dom.append(desc)
return dom
@@ -206,7 +206,7 @@ class Group(object):
group["id"] = self.group_id
group["description"] = self.description
group["tenantId"] = self.tenant_id
-
+
return {'group': group}
def to_json(self):
@@ -222,30 +222,30 @@ class Groups(object):
def to_xml(self):
dom = etree.Element("groups")
- dom.set(u"xmlns","http://docs.openstack.org/idm/api/v1.0")
-
+ dom.set(u"xmlns","http://docs.openstack.org/idm/api/v1.0")
+
for t in self.values:
dom.append(t.to_dom())
-
+
for t in self.links:
dom.append(t.to_dom())
-
+
return etree.tostring(dom)
def to_json(self):
values = [t.to_dict()["group"] for t in self.values]
links = [t.to_dict()["links"] for t in self.links]
return json.dumps({"groups": {"values": values,"links":links}})
-
-
-
+
+
+
class User(object):
"Describes a user in the auth system"
def __init__(self, user_id, email, group_id, tenant_id, enabled):
-
+
self.user_id = user_id
-
+
self.tenant_id = tenant_id
self.email = email
self.enabled = enabled and True or False
@@ -259,8 +259,8 @@ class User(object):
if root == None:
raise fault.BadRequestFault("Expecting Group")
group_id = root.get("id")
-
-
+
+
desc = root.find("{http://docs.openstack.org/idm/api/v1.0}"
"description")
if desc == None:
@@ -273,16 +273,16 @@ class User(object):
def from_json(json_str):
try:
obj = json.loads(json_str)
-
+
if not "group" in obj:
raise fault.BadRequestFault("Expecting group")
group = obj["group"]
-
+
if not "id" in group:
group_id = None
else:
group_id = group["id"]
-
+
if not "description" in group:
raise fault.BadRequestFault("Expecting Group Description")
description = group["description"]
@@ -290,22 +290,22 @@ class User(object):
except (ValueError, TypeError) as e:
raise fault.BadRequestFault("Cannot parse Group", str(e))
"""
-
+
def to_dom(self):
dom = etree.Element("user",
xmlns="http://docs.openstack.org/idm/api/v1.0")
if self.group_id:
dom.set("id", self.group_id)
-
+
if self.tenant_id:
dom.set("tenantId", self.tenant_id)
-
+
if self.tenant_id:
dom.set("email", self.email)
-
+
if self.tenant_id:
dom.set("enabled", self.enabled)
-
+
return dom
def to_xml(self):
@@ -318,7 +318,7 @@ class User(object):
group["email"] = self.email
group["enabled"] = self.enabled
group["tenantId"] = self.tenant_id
-
+
return {'user': user}
def to_json(self):
@@ -334,14 +334,14 @@ class Users(object):
def to_xml(self):
dom = etree.Element("users")
- dom.set(u"xmlns","http://docs.openstack.org/idm/api/v1.0")
-
+ dom.set(u"xmlns","http://docs.openstack.org/idm/api/v1.0")
+
for t in self.values:
dom.append(t.to_dom())
-
+
for t in self.links:
dom.append(t.to_dom())
-
+
return etree.tostring(dom)
def to_json(self):
diff --git a/keystone/identity.py b/keystone/server.py
index 18c826ef..c5e53fdf 100644
--- a/keystone/identity.py
+++ b/keystone/server.py
@@ -55,6 +55,7 @@ POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir))
if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'keystone', '__init__.py')):
sys.path.insert(0, POSSIBLE_TOPDIR)
+print POSSIBLE_TOPDIR
import keystone.logic.service as serv
import keystone.logic.types.auth as auth
@@ -150,10 +151,10 @@ def wrap_error(func):
@wrap_error
def get_version_info():
if is_xml_response():
- resp_file = "content/version.xml"
+ resp_file = os.path.join(POSSIBLE_TOPDIR, "keystone/content/version.xml.tpl")
response.content_type = "application/xml"
else:
- resp_file = "content/version.json"
+ resp_file = os.path.join(POSSIBLE_TOPDIR, "keystone/content/version.json.tpl")
response.content_type = "application/json"
hostname = request.environ.get("SERVER_NAME")
port = request.environ.get("SERVER_PORT")
@@ -387,7 +388,11 @@ def get_extension(ext_alias):
#
raise fault.ItemNotFoundFault("The extension is not found")
+def start_server(port=8080):
+ app = exthandler.UrlExtensionFilter(bottle.default_app(), None)
+ wsgi.server(eventlet.listen(('', port)), app)
if __name__ == "__main__":
- app = exthandler.UrlExtensionFilter(bottle.default_app(), None)
- wsgi.server(eventlet.listen(('', 8080)), app)
+ start_server()
+
+
diff --git a/pip-requires b/pip-requires
index 1ef8484b..6eb85607 100644
--- a/pip-requires
+++ b/pip-requires
@@ -5,5 +5,6 @@ paste
pastedeploy
pastescript
pysqlite
+routes
sqlalchemy
webob
diff --git a/test/unit/test_identity.py b/test/unit/test_identity.py
index 76125f79..c7824372 100644
--- a/test/unit/test_identity.py
+++ b/test/unit/test_identity.py
@@ -1,2093 +1,2119 @@
-import os
-import sys
-# Need to access identity module
-sys.path.append(os.path.abspath(os.path.join(os.path.abspath(__file__),
- '..', '..', '..', '..', 'keystone')))
-from keystone import identity
-import unittest
-from webtest import TestApp
-import httplib2
-import json
-from lxml import etree
-import unittest
-from webtest import TestApp
-
-URL = 'http://localhost:8080/v1.0/'
-
-
-def get_token(user, pswd, kind=''):
- h = httplib2.Http(".cache")
- url = '%stoken' % URL
- body = {"passwordCredentials": {"username": user,
- "password": pswd}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json"})
- content = json.loads(content)
- token = str(content['auth']['token']['id'])
- if kind == 'token':
- return token
- else:
- return (resp, content)
-
-
-def delete_token(token, auth_token):
- h = httplib2.Http(".cache")
- url = '%stoken/%s' % (URL, token)
- resp, content = h.request(url, "DELETE", body='', \
- headers={"Content-Type": "application/json", \
- "X-Auth-Token": auth_token})
- return (resp, content)
-
-
-def create_tenant(tenantid, auth_token):
- h = httplib2.Http(".cache")
-
- url = '%stenants' % (URL)
- body = {"tenant": {"id": tenantid,
- "description": "A description ...",
- "enabled": True}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": auth_token})
- return (resp, content)
-
-
-def create_tenant_group(groupid, tenantid, auth_token):
- h = httplib2.Http(".cache")
-
- url = '%stenant/%s/groups' % (URL,tenantid)
- body = {"group": {"id": groupid,
- "description": "A description ..."
- }}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": auth_token})
- return (resp, content)
-
-
-def delete_tenant(tenantid, auth_token):
- h = httplib2.Http(".cache")
- url = '%stenants/%s' % (URL, tenantid)
- resp, content = h.request(url, "DELETE", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": auth_token})
- return (resp, content)
-
-
-def delete_tenant_group(groupid, tenantid, auth_token):
- h = httplib2.Http(".cache")
- url = '%stenant/%s/groups/%s' % (URL, tenantid, groupid)
- resp, content = h.request(url, "DELETE", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": auth_token})
- return (resp, content)
-
-
-def create_global_group(auth_token):
- h = httplib2.Http(".cache")
-
- url = '%s/groups' % (URL)
- body = {"group": {"id": 'Admin',
- "description": "A description ..."
- }}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": auth_token})
- return (resp, content)
-
-
-def delete_global_group(groupid, auth_token):
- h = httplib2.Http(".cache")
- url = '%s/groups/%s' % (URL, groupid)
- resp, content = h.request(url, "DELETE", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": auth_token})
- return (resp, content)
-
-
-def get_token_xml(user, pswd, type=''):
- h = httplib2.Http(".cache")
- url = '%stoken' % URL
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <passwordCredentials \
- xmlns="http://docs.openstack.org/idm/api/v1.0" \
- password="%s" username="%s" \
- tenantId="77654"/> ' % (pswd, user)
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "ACCEPT": "application/xml"})
- dom = etree.fromstring(content)
- root = dom.find("{http://docs.openstack.org/idm/api/v1.0}token")
- token_root = root.attrib
- token = str(token_root['id'])
- if type == 'token':
- return token
- else:
- return (resp, content)
-
-
-def delete_token_xml(token, auth_token):
- h = httplib2.Http(".cache")
- url = '%stoken/%s' % (URL, token)
- resp, content = h.request(url, "DELETE", body='',\
- headers={"Content-Type": "application/xml", \
- "X-Auth-Token": auth_token,
- "ACCEPT": "application/xml"})
- return (resp, content)
-
-
-def create_tenant_xml(tenantid, auth_token):
- h = httplib2.Http(".cache")
- url = '%stenants' % (URL)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true" id="%s"> \
- <description>A description...</description> \
- </tenant>' % tenantid
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": auth_token,
- "ACCEPT": "application/xml"})
- return (resp, content)
-
-
-def create_tenant_group_xml(groupid, tenantid, auth_token):
- h = httplib2.Http(".cache")
- url = '%stenant/%s/groups' % (URL,tenantid)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % groupid
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": auth_token,
- "ACCEPT": "application/xml"})
- return (resp, content)
-
-
-def delete_tenant_xml(tenantid, auth_token):
- h = httplib2.Http(".cache")
- url = '%stenants/%s' % (URL, tenantid)
- resp, content = h.request(url, "DELETE", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": auth_token,
- "ACCEPT": "application/xml"})
- return (resp, content)
-
-
-def delete_tenant_group_xml(groupid, tenantid, auth_token):
- h = httplib2.Http(".cache")
- url = '%stenant/%s/groups/%s' % (URL, tenantid, groupid)
- resp, content = h.request(url, "DELETE", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": auth_token,
- "ACCEPT": "application/xml"})
- return (resp, content)
-
-
-def create_global_group_xml(auth_token):
- h = httplib2.Http(".cache")
- url = '%s/groups' % (URL)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="Admin"> \
- <description>A description...</description> \
- </group>'
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": auth_token,
- "ACCEPT": "application/xml"})
- return (resp, content)
-
-
-def delete_global_group_xml(groupid, auth_token):
- h = httplib2.Http(".cache")
- url = '%s/groups/%s' % (URL, groupid)
- resp, content = h.request(url, "DELETE", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": auth_token,
- "ACCEPT": "application/xml"})
- return (resp, content)
-
-
-def get_tenant():
- return '1234'
-
-
-def get_user():
- return '1234'
-
-
-def get_userdisabled():
- return '1234'
-
-
-def get_auth_token():
- return '999888777666'
-
-
-def get_exp_auth_token():
- return '000999'
-
-
-def get_disabled_token():
- return '999888777'
-
-
-class identity_test(unittest.TestCase):
-
- #Given _a_ to make inherited test cases in an order.
- #here to call below method will call as last test case
-
- def test_a_get_version(self):
- h = httplib2.Http(".cache")
- url = URL
- resp, content = h.request(url, "GET", body="",
- headers={"Content-Type": "application/json"})
- self.assertEqual(200, int(resp['status']))
- self.assertEqual('application/json', resp['content-type'])
-
- def test_a_get_version(self):
- h = httplib2.Http(".cache")
- url = URL
- resp, content = h.request(url, "GET", body="",
- headers={"Content-Type": "application/xml",
- "ACCEPT": "application/xml"})
- self.assertEqual(200, int(resp['status']))
- self.assertEqual('application/xml', resp['content-type'])
-
-
-class authorize_test(identity_test):
-
- def setUp(self):
- self.token = get_token('joeuser', 'secrete', 'token')
- self.tenant = get_tenant()
- self.user = get_user()
- self.userdisabled = get_userdisabled()
- self.auth_token = get_auth_token()
- self.exp_auth_token = get_exp_auth_token()
- self.disabled_token = get_disabled_token()
-
-
-
- def tearDown(self):
- delete_token(self.token, self.auth_token)
-
- def test_a_authorize(self):
- resp, content = get_token('joeuser', 'secrete')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual('application/json', resp['content-type'])
-
- def test_a_authorize_xml(self):
- resp, content = get_token_xml('joeuser', 'secrete')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual('application/xml', resp['content-type'])
-
- def test_a_authorize_user_disaabled(self):
- h = httplib2.Http(".cache")
- url = '%stoken' % URL
- body = {"passwordCredentials": {"username": "disabled",
- "password": "self.tenant_group='test_tenant_group'secrete"}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json"})
- content = json.loads(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_a_authorize_user_disaabled_xml(self):
- h = httplib2.Http(".cache")
- url = '%stoken' % URL
-
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <passwordCredentials \
- xmlns="http://docs.openstack.org/idm/api/v1.0" \
- password="secrete" username="disabled" \
- />'
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "ACCEPT": "application/xml"})
- content = etree.fromstring(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_a_authorize_user_wrong(self):
- h = httplib2.Http(".cache")
- url = '%stoken' % URL
- body = {"passwordCredentials": {"username-w": "disabled",
- "password": "secrete"}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json"})
- content = json.loads(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(400, int(resp['status']))
-
- def test_a_authorize_user_wrong_xml(self):
- h = httplib2.Http(".cache")
- url = '%stoken' % URL
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <passwordCredentials \
- xmlns="http://docs.openstack.org/idm/api/v1.0" \
- password="secrete" username-w="disabled" \
- />'
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "ACCEPT": "application/xml"})
- content = etree.fromstring(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(400, int(resp['status']))
-
-
-class validate_token(authorize_test):
-
- def test_validate_token_true(self):
- h = httplib2.Http(".cache")
-
- url = '%stoken/%s?belongsTo=%s' % (URL, self.token, self.tenant)
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/json", \
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual('application/json', resp['content-type'])
-
- def test_validate_token_true_xml(self):
- h = httplib2.Http(".cache")
- url = '%stoken/%s?belongsTo=%s' % (URL, self.token, self.tenant)
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml", \
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual('application/xml', resp['content-type'])
-
- def test_validate_token_expired(self):
- h = httplib2.Http(".cache")
- url = '%stoken/%s?belongsTo=%s' % (URL, self.exp_auth_token, \
- self.tenant)
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/json", \
- "X-Auth-Token": self.exp_auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
- self.assertEqual('application/json', resp['content-type'])
-
- def test_validate_token_expired_xml(self):
- h = httplib2.Http(".cache")
-
- url = '%stoken/%s?belongsTo=%s' % (URL, self.exp_auth_token, \
- self.tenant)
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml", \
- "X-Auth-Token": self.exp_auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
- self.assertEqual('application/xml', resp['content-type'])
-
- def test_validate_token_invalid(self):
- h = httplib2.Http(".cache")
- url = '%stoken/%s?belongsTo=%s' % (URL, 'NonExistingToken', \
- self.tenant)
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/json", \
- "X-Auth-Token": self.auth_token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
- self.assertEqual('application/json', resp['content-type'])
-
- def test_validate_token_invalid_xml(self):
- h = httplib2.Http(".cache")
- url = '%stoken/%s?belongsTo=%s' % (URL, 'NonExistingToken', \
- self.tenant)
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/json", \
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
- self.assertEqual('application/json', resp['content-type'])
-
-
-class tenant_test(unittest.TestCase):
-
- def setUp(self):
- self.token = get_token('joeuser', 'secrete', 'token')
- self.tenant = get_tenant()
- self.user = get_user()
- self.userdisabled = get_userdisabled()
- self.auth_token = get_auth_token()
- self.exp_auth_token = get_exp_auth_token()
- self.disabled_token = get_disabled_token()
-
- def tearDown(self):
- resp, content = delete_tenant(self.tenant, self.auth_token)
-""" "passwordCredentials" : {"username" : "joeuser","password": "secrete","tenantId": "1234"}
-"""
-
-class create_tenant_test(tenant_test):
-
- def test_tenant_create(self):
- resp, content = delete_tenant('test_tenant', str(self.auth_token))
-
- resp, content = create_tenant('test_tenant', str(self.auth_token))
- self.tenant = 'test_tenant'
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
-
- if int(resp['status']) not in (200, 201):
-
- self.fail('Failed due to %d' % int(resp['status']))
-
- def test_tenant_create_xml(self):
- resp, content = delete_tenant_xml('test_tenant', str(self.auth_token))
- resp, content = create_tenant_xml('test_tenant', str(self.auth_token))
- self.tenant = 'test_tenant'
- content = etree.fromstring(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
-
- if int(resp['status']) not in (200, 201):
-
- self.fail('Failed due to %d' % int(resp['status']))
-
- def test_tenant_create_again(self):
-
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(409, int(resp['status']))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- def test_tenant_create_again_xml(self):
-
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get("id")
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(409, int(resp['status']))
- if int(resp['status']) == 200:
- self.tenant = content.get("id")
-
- def test_tenant_create_forbidden_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenants' % (URL)
- body = {"tenant": {"id": self.tenant,
- "description": "A description ...",
- "enabled": True}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": self.token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_create_forbidden_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenants' % (URL)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true" id="%s"> \
- <description>A description...</description> \
- </tenant>' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_create_expired_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenants' % (URL)
- body = {"tenant": {"id": self.tenant,
- "description": "A description ...",
- "enabled": True}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": self.exp_auth_token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_create_expired_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenants' % (URL)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true" id="%s"> \
- <description>A description...</description> \
- </tenant>' % self.tenant
-
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.exp_auth_token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_create_missing_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenants' % (URL)
- body = {"tenant": {"id": self.tenant,
- "description": "A description ...",
- "enabled": True}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_create_missing_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenants' % (URL)
-
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true" id="%s"> \
- <description>A description...</description> \
- </tenant>' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_create_disabled_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenants' % (URL)
- body = '{"tenant": { "id": "%s", \
- "description": "A description ...", "enabled"\
- :true } }' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.disabled_token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_create_disabled_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenants' % (URL)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true" id="%s"> \
- <description>A description...</description> \
- </tenant>' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "X-Auth-Token": self.disabled_token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_create_invalid_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenants' % (URL)
- body = '{"tenant": { "id": "%s", \
- "description": "A description ...", "enabled"\
- :true } }' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": 'nonexsitingtoken'})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_create_invalid_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenants' % (URL)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true" id="%s"> \
- <description>A description...</description> \
- </tenant>' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": 'nonexsitingtoken',
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
-
-class get_tenants_test(tenant_test):
-
- def test_get_tenants(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenants_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenants_forbidden_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_get_tenants_forbidden_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_get_tenants_exp_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.exp_auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_get_tenants_exp_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.exp_auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
-
-class get_tenant_test(tenant_test):
-
- def test_get_tenant(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, self.tenant)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenant_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, self.tenant)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenant_bad(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, 'tenant_bad')
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_get_tenant_bad_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, 'tenant_bad')
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_get_tenant_not_found(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/NonexistingID' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_get_tenant_not_found_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/NonexistingID' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
-
-class update_tenant_test(tenant_test):
-
- def test_update_tenant(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, self.tenant)
- data = '{"tenant": { "description": "A NEW description..." ,\
- "enabled":true }}'
- #test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- body = json.loads(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual(int(self.tenant), int(body['tenant']['id']))
- self.assertEqual('A NEW description...', \
- body['tenant']['description'])
-
- def test_update_tenant_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, self.tenant)
- data = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true"> \
- <description>A NEW description...</description> \
- </tenant>'
-
- #test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- body = etree.fromstring(content)
- desc = body.find("{http://docs.openstack.org/idm/api/v1.0}description")
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual(int(self.tenant), int(body.get('id')))
- self.assertEqual('A NEW description...', \
- desc.text)
-
- def test_update_tenant_bad(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, self.tenant)
- data = '{"tenant": { "description_bad": "A NEW description...",\
- "enabled":true }}'
- #test for Content-Type = application/json
-
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(400, int(resp['status']))
-
- def test_update_tenant_bad_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, self.tenant)
- data = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true"> \
- <description_bad>A NEW description...</description> \
- </tenant>'
- #test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(400, int(resp['status']))
-
- def test_update_tenant_not_found(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/NonexistingID' % (URL)
- data = '{"tenant": { "description": "A NEW description...",\
- "enabled":true }}'
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body=data,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_update_tenant_not_found_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/NonexistingID' % (URL)
- data = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true"> \
- <description_bad>A NEW description...</description> \
- </tenant>'
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body=data,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
-
-class delete_tenant_test(tenant_test):
-
- def test_delete_tenant_not_found(self):
- #resp,content=create_tenant("test_tenant_delete", str(self.auth_token))
- resp, content = delete_tenant("test_tenant_delete111", \
- str(self.auth_token))
- self.assertEqual(404, int(resp['status']))
-
- def test_delete_tenant_not_found_xml(self):
- #resp,content=create_tenant("test_tenant_delete", str(self.auth_token))
- resp, content = delete_tenant_xml("test_tenant_delete111", \
- str(self.auth_token))
- self.assertEqual(404, int(resp['status']))
-
- def test_delete_tenant(self):
- resp, content = create_tenant("test_tenant_delete", \
- str(self.auth_token))
- resp, content = delete_tenant("test_tenant_delete", \
- str(self.auth_token))
- self.assertEqual(204, int(resp['status']))
-
- def test_delete_tenant_xml(self):
- resp, content = create_tenant_xml("test_tenant_delete", \
- str(self.auth_token))
- resp, content = delete_tenant_xml("test_tenant_delete", \
- str(self.auth_token))
- self.assertEqual(204, int(resp['status']))
-
-
-
-
-class tenant_group_test(unittest.TestCase):
-
- def setUp(self):
- self.token = get_token('joeuser', 'secrete', 'token')
- self.tenant = get_tenant()
- self.user = get_user()
- self.userdisabled = get_userdisabled()
- self.auth_token = get_auth_token()
- self.exp_auth_token = get_exp_auth_token()
- self.disabled_token = get_disabled_token()
- self.tenant_group = 'test_tenant_group'
-
- def tearDown(self):
- resp, content = delete_tenant_group('test_tenant_group', \
- self.tenant, self.auth_token)
- resp, content = delete_tenant(self.tenant, self.auth_token)
-
-
-class create_tenant_group_test(tenant_group_test):
-
- def test_tenant_group_create(self):
- resp, content = delete_tenant('test_tenant', str(self.auth_token))
- resp, content = create_tenant('test_tenant', str(self.auth_token))
-
- respG, contentG = delete_tenant_group('test_tenant_group', \
- 'test_tenant', str(self.auth_token))
- respG, contentG = create_tenant_group('test_tenant_group', \
- 'test_tenant', str(self.auth_token))
-
- self.tenant = 'test_tenant'
- self.tenant_group = 'test_tenant_group'
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
-
- if int(respG['status']) not in (200, 201):
-
- self.fail('Failed due to %d' % int(respG['status']))
-
- def test_tenant_group_create_xml(self):
- resp, content = delete_tenant_xml('test_tenant', str(self.auth_token))
- resp, content = create_tenant_xml('test_tenant', str(self.auth_token))
- respG, contentG = delete_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
-
- self.tenant = 'test_tenant'
- self.tenant_group = 'test_tenant_group'
- content = etree.fromstring(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
-
- if int(respG['status']) not in (200, 201):
-
- self.fail('Failed due to %d' % int(respG['status']))
-
- def test_tenant_group_create_again(self):
-
- resp, content = create_tenant("test_tenant", str(self.auth_token))
-
- respG, contentG = create_tenant_group('test_tenant_group', \
- "test_tenant", str(self.auth_token))
- respG, contentG = create_tenant_group('test_tenant_group', \
- "test_tenant", str(self.auth_token))
-
- if int(respG['status']) == 200:
- self.tenant = content['tenant']['id']
- self.tenant_group = contentG['group']['id']
- if int(respG['status']) == 500:
- self.fail('IDM fault')
- elif int(respG['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(409, int(respG['status']))
- if int(respG['status']) == 200:
- self.tenant = content['tenant']['id']
- self.tenant_group = contentG['group']['id']
-
- def test_tenant_group_create_again_xml(self):
-
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
-
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
-
- content = etree.fromstring(content)
- contentG = etree.fromstring(contentG)
- if int(respG['status']) == 200:
- self.tenant = content.get("id")
- self.tenant_group = contentG.get("id")
-
- if int(respG['status']) == 500:
- self.fail('IDM fault')
- elif int(respG['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(409, int(respG['status']))
- if int(respG['status']) == 200:
- self.tenant = content.get("id")
- self.tenant_group = contentG.get("id")
-
- def test_tenant_group_create_forbidden_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- if int(respG['status']) == 200:
- self.tenant_group = respG['group']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = {"group": {"id": self.tenant_group,
- "description": "A description ..."
- }}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": self.token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_group_create_forbidden_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml", \
- "X-Auth-Token": self.token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
-
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_group_create_expired_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = {"group": {"id": self.tenant_group,
- "description": "A description ..."
- }}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": self.exp_auth_token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_expired_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant
-
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml", \
- "X-Auth-Token": self.exp_auth_token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_missing_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = {"group": {"id": self.tenant_group,
- "description": "A description ..."}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_missing_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
-
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_disabled_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '{"group": { "id": "%s", \
- "description": "A description ..." } }' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.disabled_token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_group_create_disabled_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "X-Auth-Token": self.disabled_token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_group_create_invalid_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '{"group": { "id": "%s", \
- "description": "A description ..." } }' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": 'nonexsitingtoken'})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_invalid_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": 'nonexsitingtoken',
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
-
-class get_tenant_groups_test(tenant_group_test):
-
- def test_get_tenant_groups(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
-
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
-
- url = '%stenant/%s/groups' % (URL,self.tenant)
-
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenant_groups_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
-
- respG, contentG = create_tenant_group_xml(self.tenant_group,\
- self.tenant, str(self.auth_token))
-
- url = '%stenant/%s/groups' % (URL,self.tenant)
-
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenant_groups_forbidden_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
-
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups' % (URL,self.tenant)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_get_tenant_groups_forbidden_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups' % (URL,self.tenant)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_get_tenant_groups_exp_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups' % (URL,self.tenant)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.exp_auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_get_tenant_groups_exp_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups' % (URL,self.tenant)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.exp_auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
-
-class get_tenant_group_test(tenant_group_test):
-
- def test_get_tenant_group(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenant_group_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group_xml(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenant_group_bad(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,'tenant_bad',self.tenant_group)
-
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_get_tenant_group_bad_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,'tenant_bad',self.tenant_group)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_get_tenant_group_not_found(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,self.tenant,'nonexistinggroup')
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_get_tenant_group_not_found_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,self.tenant,'nonexistinggroup')
-
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
-
-class update_tenant_group_test(tenant_group_test):
-
- def test_update_tenant_group(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group)
-
- data = '{"group": { "id":"%s","description": "A NEW description..." ,\
- "tenantId":"%s" }}' % (self.tenant_group,self.tenant)
- #test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- body = json.loads(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual(self.tenant_group, body['group']['id'])
- self.assertEqual('A NEW description...', \
- body['group']['description'])
-
- def test_update_tenant_group_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL, self.tenant ,self.tenant_group)
- data = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- tenantId="%s" id="%s"> \
- <description>A NEW description...</description> \
- </group>' % (self.tenant, self.tenant_group)
-
- #test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
-
- body = etree.fromstring(content)
- desc = body.find("{http://docs.openstack.org/idm/api/v1.0}description")
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual(str(self.tenant_group), str(body.get('id')))
- self.assertEqual('A NEW description...', \
- desc.text)
-
- def test_update_tenant_group_bad(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group)
- data = '{"group": { "description_bad": "A NEW description...",\
- "id":"%s","tenantId":"%s" }}' % (self.tenant_group,self.tenant)
- #test for Content-Type = application/json
-
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(400, int(resp['status']))
-
- def test_update_tenant_group_bad_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group)
- data = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- tenantId="%s" id="%s"> \
- <description_bad>A NEW description...</description> \
- </group>' % (self.tenant, self.tenant_group)
- #test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(400, int(resp['status']))
-
- def test_update_tenant_group_not_found(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/NonexistingID' % (URL, self.tenant)
-
- data = '{"group": { "description": "A NEW description...",\
- "id":"NonexistingID", "tenantId"="test_tenant" }}'
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body=data,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_update_tenant_group_not_found_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/NonexistingID' % (URL, self.tenant)
- data = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="NonexistingID", "tenant_id"="test_tenant"> \
- <description_bad>A NEW description...</description> \
- </group>'
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body=data,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
-
-class delete_tenant_group_test(tenant_test):
-
- def test_delete_tenant_group_not_found(self):
- #resp,content=create_tenant("test_tenant_delete", str(self.auth_token))
- resp, content = delete_tenant_group("test_tenant_delete111", \
- "test_tenant", str(self.auth_token))
- self.assertEqual(404, int(resp['status']))
-
- def test_delete_tenant_group_not_found_xml(self):
- #resp,content=create_tenant("test_tenant_delete", str(self.auth_token))
- resp, content = delete_tenant_group_xml("test_tenant_delete111", \
- "test_tenant", str(self.auth_token))
- self.assertEqual(404, int(resp['status']))
-
- def test_delete_tenant_group(self):
- resp, content = create_tenant("test_tenant_delete", \
- str(self.auth_token))
- respG, contentG = create_tenant_group('test_tenant_group_delete', \
- "test_tenant_delete", str(self.auth_token))
- respG, contentG = delete_tenant_group('test_tenant_group_delete', \
- "test_tenant_delete", str(self.auth_token))
- resp, content = delete_tenant("test_tenant_delete", \
- str(self.auth_token))
- self.assertEqual(204, int(respG['status']))
-
- def test_delete_tenant_group_xml(self):
- resp, content = create_tenant_xml("test_tenant_delete", \
- str(self.auth_token))
- respG, contentG = create_tenant_group_xml('test_tenant_group_delete', \
- "test_tenant_delete", str(self.auth_token))
- respG, contentG = delete_tenant_group_xml('test_tenant_group_delete', \
- "test_tenant_delete", str(self.auth_token))
- resp, content = delete_tenant_xml("test_tenant_delete", \
- str(self.auth_token))
- self.assertEqual(204, int(respG['status']))
-
-class create_global_group_test(global_group_test):
-
- def test_global_group_create(self):
-
- respG, contentG = delete_global_group('test_tenant_group', \
- str(self.auth_token))
- respG, contentG = create_global_group(str(self.auth_token))
- self.group = 'test_tenant_group'
-
- if int(respG['status']) == 500:
- self.fail('IDM fault')
- elif int(respG['status']) == 503:
- self.fail('Service Not Available')
- if int(respG['status']) not in (200, 201):
- self.fail('Failed due to %d' % int(respG['status']))
-
-
- def test_global_group_create_again(self):
-
- respG, contentG = create_global_group('test_tenant_group', \
- str(self.auth_token))
- respG, contentG = create_global_group('test_tenant_group', \
- "test_tenant", str(self.auth_token))
-
- if int(respG['status']) == 200:
- self.tenant = content['tenant']['id']
- self.tenant_group = contentG['group']['id']
- if int(respG['status']) == 500:
- self.fail('IDM fault')
- elif int(respG['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(409, int(respG['status']))
- if int(respG['status']) == 200:
- self.tenant = content['tenant']['id']
- self.tenant_group = contentG['group']['id']
-
-
- def test_tenant_group_create_forbidden_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- if int(respG['status']) == 200:
- self.tenant_group = respG['group']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = {"group": {"id": self.tenant_group,
- "description": "A description ..."
- }}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": self.token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
-
- def test_tenant_group_create_expired_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = {"group": {"id": self.tenant_group,
- "description": "A description ..."
- }}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": self.exp_auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_missing_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = {"group": {"id": self.tenant_group,
- "description": "A description ..."}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_disabled_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '{"group": { "id": "%s", \
- "description": "A description ..." } }' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.disabled_token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_group_create_invalid_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '{"group": { "id": "%s", \
- "description": "A description ..." } }' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": 'nonexsitingtoken'})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
-
-
- def test_tenant_group_create_xml(self):
- resp, content = delete_tenant_xml('test_tenant', str(self.auth_token))
- resp, content = create_tenant_xml('test_tenant', str(self.auth_token))
- respG, contentG = delete_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
-
- self.tenant = 'test_tenant'
- self.tenant_group = 'test_tenant_group'
- content = etree.fromstring(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
-
- if int(respG['status']) not in (200, 201):
-
- self.fail('Failed due to %d' % int(respG['status']))
-
- def test_tenant_group_create_again_xml(self):
-
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
-
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
-
- content = etree.fromstring(content)
- contentG = etree.fromstring(contentG)
- if int(respG['status']) == 200:
- self.tenant = content.get("id")
- self.tenant_group = contentG.get("id")
-
- if int(respG['status']) == 500:
- self.fail('IDM fault')
- elif int(respG['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(409, int(respG['status']))
- if int(respG['status']) == 200:
- self.tenant = content.get("id")
- self.tenant_group = contentG.get("id")
-
- def test_tenant_group_create_forbidden_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml", \
- "X-Auth-Token": self.token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
-
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_group_create_expired_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant
-
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml", \
- "X-Auth-Token": self.exp_auth_token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_missing_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
-
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_disabled_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "X-Auth-Token": self.disabled_token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_group_create_invalid_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": 'nonexsitingtoken',
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
-
-
-if __name__ == '__main__':
- unittest.main()
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+# Copyright (c) 2010-2011 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import httplib2
+import json
+from lxml import etree
+import os
+import sys
+from webtest import TestApp
+import unittest
+
+# Need to access server module
+sys.path.append(os.path.abspath(os.path.join(os.path.abspath(__file__),
+ '..', '..', '..', '..', 'keystone')))
+from keystone import server
+
+URL = 'http://localhost:8080/v1.0/'
+
+
+def get_token(user, pswd, kind=''):
+ h = httplib2.Http(".cache")
+ url = '%stoken' % URL
+ body = {"passwordCredentials": {"username": user,
+ "password": pswd}}
+ resp, content = h.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json"})
+ content = json.loads(content)
+ token = str(content['auth']['token']['id'])
+ if kind == 'token':
+ return token
+ else:
+ return (resp, content)
+
+
+def delete_token(token, auth_token):
+ h = httplib2.Http(".cache")
+ url = '%stoken/%s' % (URL, token)
+ resp, content = h.request(url, "DELETE", body='', \
+ headers={"Content-Type": "application/json", \
+ "X-Auth-Token": auth_token})
+ return (resp, content)
+
+
+def create_tenant(tenantid, auth_token):
+ h = httplib2.Http(".cache")
+
+ url = '%stenants' % (URL)
+ body = {"tenant": {"id": tenantid,
+ "description": "A description ...",
+ "enabled": True}}
+ resp, content = h.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": auth_token})
+ return (resp, content)
+
+
+def create_tenant_group(groupid, tenantid, auth_token):
+ h = httplib2.Http(".cache")
+
+ url = '%stenant/%s/groups' % (URL,tenantid)
+ body = {"group": {"id": groupid,
+ "description": "A description ..."
+ }}
+ resp, content = h.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": auth_token})
+ return (resp, content)
+
+
+def delete_tenant(tenantid, auth_token):
+ h = httplib2.Http(".cache")
+ url = '%stenants/%s' % (URL, tenantid)
+ resp, content = h.request(url, "DELETE", body='{}',\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": auth_token})
+ return (resp, content)
+
+
+def delete_tenant_group(groupid, tenantid, auth_token):
+ h = httplib2.Http(".cache")
+ url = '%stenant/%s/groups/%s' % (URL, tenantid, groupid)
+ resp, content = h.request(url, "DELETE", body='{}',\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": auth_token})
+ return (resp, content)
+
+
+def create_global_group(auth_token):
+ h = httplib2.Http(".cache")
+
+ url = '%s/groups' % (URL)
+ body = {"group": {"id": 'Admin',
+ "description": "A description ..."
+ }}
+ resp, content = h.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": auth_token})
+ return (resp, content)
+
+
+def delete_global_group(groupid, auth_token):
+ h = httplib2.Http(".cache")
+ url = '%s/groups/%s' % (URL, groupid)
+ resp, content = h.request(url, "DELETE", body='{}',\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": auth_token})
+ return (resp, content)
+
+
+def get_token_xml(user, pswd, type=''):
+ h = httplib2.Http(".cache")
+ url = '%stoken' % URL
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <passwordCredentials \
+ xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ password="%s" username="%s" \
+ tenantId="77654"/> ' % (pswd, user)
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",
+ "ACCEPT": "application/xml"})
+ dom = etree.fromstring(content)
+ root = dom.find("{http://docs.openstack.org/idm/api/v1.0}token")
+ token_root = root.attrib
+ token = str(token_root['id'])
+ if type == 'token':
+ return token
+ else:
+ return (resp, content)
+
+
+def delete_token_xml(token, auth_token):
+ h = httplib2.Http(".cache")
+ url = '%stoken/%s' % (URL, token)
+ resp, content = h.request(url, "DELETE", body='',\
+ headers={"Content-Type": "application/xml", \
+ "X-Auth-Token": auth_token,
+ "ACCEPT": "application/xml"})
+ return (resp, content)
+
+
+def create_tenant_xml(tenantid, auth_token):
+ h = httplib2.Http(".cache")
+ url = '%stenants' % (URL)
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ enabled="true" id="%s"> \
+ <description>A description...</description> \
+ </tenant>' % tenantid
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": auth_token,
+ "ACCEPT": "application/xml"})
+ return (resp, content)
+
+
+def create_tenant_group_xml(groupid, tenantid, auth_token):
+ h = httplib2.Http(".cache")
+ url = '%stenant/%s/groups' % (URL,tenantid)
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ id="%s"> \
+ <description>A description...</description> \
+ </group>' % groupid
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": auth_token,
+ "ACCEPT": "application/xml"})
+ return (resp, content)
+
+
+def delete_tenant_xml(tenantid, auth_token):
+ h = httplib2.Http(".cache")
+ url = '%stenants/%s' % (URL, tenantid)
+ resp, content = h.request(url, "DELETE", body='',\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": auth_token,
+ "ACCEPT": "application/xml"})
+ return (resp, content)
+
+
+def delete_tenant_group_xml(groupid, tenantid, auth_token):
+ h = httplib2.Http(".cache")
+ url = '%stenant/%s/groups/%s' % (URL, tenantid, groupid)
+ resp, content = h.request(url, "DELETE", body='',\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": auth_token,
+ "ACCEPT": "application/xml"})
+ return (resp, content)
+
+
+def create_global_group_xml(auth_token):
+ h = httplib2.Http(".cache")
+ url = '%s/groups' % (URL)
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ id="Admin"> \
+ <description>A description...</description> \
+ </group>'
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": auth_token,
+ "ACCEPT": "application/xml"})
+ return (resp, content)
+
+
+def delete_global_group_xml(groupid, auth_token):
+ h = httplib2.Http(".cache")
+ url = '%s/groups/%s' % (URL, groupid)
+ resp, content = h.request(url, "DELETE", body='',\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": auth_token,
+ "ACCEPT": "application/xml"})
+ return (resp, content)
+
+
+def get_tenant():
+ return '1234'
+
+
+def get_user():
+ return '1234'
+
+
+def get_userdisabled():
+ return '1234'
+
+
+def get_auth_token():
+ return '999888777666'
+
+
+def get_exp_auth_token():
+ return '000999'
+
+
+def get_disabled_token():
+ return '999888777'
+
+
+class server_test(unittest.TestCase):
+
+ #Given _a_ to make inherited test cases in an order.
+ #here to call below method will call as last test case
+
+ def test_get_version_json(self):
+ h = httplib2.Http(".cache")
+ url = URL
+ resp, content = h.request(url, "GET", body="",
+ headers={"Content-Type": "application/json"})
+ self.assertEqual(200, int(resp['status']))
+ self.assertEqual('application/json', resp['content-type'])
+
+ def test_get_version_xml(self):
+ h = httplib2.Http(".cache")
+ url = URL
+ resp, content = h.request(url, "GET", body="",
+ headers={"Content-Type": "application/xml",
+ "ACCEPT": "application/xml"})
+ self.assertEqual(200, int(resp['status']))
+ self.assertEqual('application/xml', resp['content-type'])
+
+
+class authorize_test(server_test):
+
+ def setUp(self):
+ self.token = get_token('joeuser', 'secrete', 'token')
+ self.tenant = get_tenant()
+ self.user = get_user()
+ self.userdisabled = get_userdisabled()
+ self.auth_token = get_auth_token()
+ self.exp_auth_token = get_exp_auth_token()
+ self.disabled_token = get_disabled_token()
+
+
+
+ def tearDown(self):
+ delete_token(self.token, self.auth_token)
+
+ def test_a_authorize(self):
+ resp, content = get_token('joeuser', 'secrete')
+ self.assertEqual(200, int(resp['status']))
+ self.assertEqual('application/json', resp['content-type'])
+
+ def test_a_authorize_xml(self):
+ resp, content = get_token_xml('joeuser', 'secrete')
+ self.assertEqual(200, int(resp['status']))
+ self.assertEqual('application/xml', resp['content-type'])
+
+ def test_a_authorize_user_disaabled(self):
+ h = httplib2.Http(".cache")
+ url = '%stoken' % URL
+ body = {"passwordCredentials": {"username": "disabled",
+ "password": "self.tenant_group='test_tenant_group'secrete"}}
+ resp, content = h.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json"})
+ content = json.loads(content)
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+ def test_a_authorize_user_disaabled_xml(self):
+ h = httplib2.Http(".cache")
+ url = '%stoken' % URL
+
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <passwordCredentials \
+ xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ password="secrete" username="disabled" \
+ />'
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",
+ "ACCEPT": "application/xml"})
+ content = etree.fromstring(content)
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+ def test_a_authorize_user_wrong(self):
+ h = httplib2.Http(".cache")
+ url = '%stoken' % URL
+ body = {"passwordCredentials": {"username-w": "disabled",
+ "password": "secrete"}}
+ resp, content = h.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json"})
+ content = json.loads(content)
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(400, int(resp['status']))
+
+ def test_a_authorize_user_wrong_xml(self):
+ h = httplib2.Http(".cache")
+ url = '%stoken' % URL
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <passwordCredentials \
+ xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ password="secrete" username-w="disabled" \
+ />'
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",
+ "ACCEPT": "application/xml"})
+ content = etree.fromstring(content)
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(400, int(resp['status']))
+
+
+class validate_token(authorize_test):
+
+ def test_validate_token_true(self):
+ h = httplib2.Http(".cache")
+
+ url = '%stoken/%s?belongsTo=%s' % (URL, self.token, self.tenant)
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/json", \
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+ self.assertEqual('application/json', resp['content-type'])
+
+ def test_validate_token_true_xml(self):
+ h = httplib2.Http(".cache")
+ url = '%stoken/%s?belongsTo=%s' % (URL, self.token, self.tenant)
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/xml", \
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+ self.assertEqual('application/xml', resp['content-type'])
+
+ def test_validate_token_expired(self):
+ h = httplib2.Http(".cache")
+ url = '%stoken/%s?belongsTo=%s' % (URL, self.exp_auth_token, \
+ self.tenant)
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/json", \
+ "X-Auth-Token": self.exp_auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+ self.assertEqual('application/json', resp['content-type'])
+
+ def test_validate_token_expired_xml(self):
+ h = httplib2.Http(".cache")
+
+ url = '%stoken/%s?belongsTo=%s' % (URL, self.exp_auth_token, \
+ self.tenant)
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/xml", \
+ "X-Auth-Token": self.exp_auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+ self.assertEqual('application/xml', resp['content-type'])
+
+ def test_validate_token_invalid(self):
+ h = httplib2.Http(".cache")
+ url = '%stoken/%s?belongsTo=%s' % (URL, 'NonExistingToken', \
+ self.tenant)
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/json", \
+ "X-Auth-Token": self.auth_token})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+ self.assertEqual('application/json', resp['content-type'])
+
+ def test_validate_token_invalid_xml(self):
+ h = httplib2.Http(".cache")
+ url = '%stoken/%s?belongsTo=%s' % (URL, 'NonExistingToken', \
+ self.tenant)
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/json", \
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+ self.assertEqual('application/json', resp['content-type'])
+
+
+class tenant_test(unittest.TestCase):
+
+ def setUp(self):
+ self.token = get_token('joeuser', 'secrete', 'token')
+ self.tenant = get_tenant()
+ self.user = get_user()
+ self.userdisabled = get_userdisabled()
+ self.auth_token = get_auth_token()
+ self.exp_auth_token = get_exp_auth_token()
+ self.disabled_token = get_disabled_token()
+
+ def tearDown(self):
+ resp, content = delete_tenant(self.tenant, self.auth_token)
+""" "passwordCredentials" : {"username" : "joeuser","password": "secrete","tenantId": "1234"}
+"""
+
+class create_tenant_test(tenant_test):
+
+ def test_tenant_create(self):
+ resp, content = delete_tenant('test_tenant', str(self.auth_token))
+
+ resp, content = create_tenant('test_tenant', str(self.auth_token))
+ self.tenant = 'test_tenant'
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+
+ if int(resp['status']) not in (200, 201):
+
+ self.fail('Failed due to %d' % int(resp['status']))
+
+ def test_tenant_create_xml(self):
+ resp, content = delete_tenant_xml('test_tenant', str(self.auth_token))
+ resp, content = create_tenant_xml('test_tenant', str(self.auth_token))
+ self.tenant = 'test_tenant'
+ content = etree.fromstring(content)
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+
+ if int(resp['status']) not in (200, 201):
+
+ self.fail('Failed due to %d' % int(resp['status']))
+
+ def test_tenant_create_again(self):
+
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(409, int(resp['status']))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ def test_tenant_create_again_xml(self):
+
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ content = etree.fromstring(content)
+ if int(resp['status']) == 200:
+ self.tenant = content.get("id")
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(409, int(resp['status']))
+ if int(resp['status']) == 200:
+ self.tenant = content.get("id")
+
+ def test_tenant_create_forbidden_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ url = '%stenants' % (URL)
+ body = {"tenant": {"id": self.tenant,
+ "description": "A description ...",
+ "enabled": True}}
+ resp, content = h.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": self.token})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+ def test_tenant_create_forbidden_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ content = etree.fromstring(content)
+ if int(resp['status']) == 200:
+ self.tenant = content.get('id')
+
+ url = '%stenants' % (URL)
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ enabled="true" id="%s"> \
+ <description>A description...</description> \
+ </tenant>' % self.tenant
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.token,
+ "ACCEPT": "application/xml"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+ def test_tenant_create_expired_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ url = '%stenants' % (URL)
+ body = {"tenant": {"id": self.tenant,
+ "description": "A description ...",
+ "enabled": True}}
+ resp, content = h.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": self.exp_auth_token})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+ def test_tenant_create_expired_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ content = etree.fromstring(content)
+ if int(resp['status']) == 200:
+ self.tenant = content.get('id')
+
+ url = '%stenants' % (URL)
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ enabled="true" id="%s"> \
+ <description>A description...</description> \
+ </tenant>' % self.tenant
+
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.exp_auth_token,
+ "ACCEPT": "application/xml"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+ def test_tenant_create_missing_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ url = '%stenants' % (URL)
+ body = {"tenant": {"id": self.tenant,
+ "description": "A description ...",
+ "enabled": True}}
+ resp, content = h.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+ def test_tenant_create_missing_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ content = etree.fromstring(content)
+ if int(resp['status']) == 200:
+ self.tenant = content.get('id')
+
+ url = '%stenants' % (URL)
+
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ enabled="true" id="%s"> \
+ <description>A description...</description> \
+ </tenant>' % self.tenant
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",
+ "ACCEPT": "application/xml"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+ def test_tenant_create_disabled_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ url = '%stenants' % (URL)
+ body = '{"tenant": { "id": "%s", \
+ "description": "A description ...", "enabled"\
+ :true } }' % self.tenant
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.disabled_token})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+ def test_tenant_create_disabled_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ content = etree.fromstring(content)
+ if int(resp['status']) == 200:
+ self.tenant = content.get('id')
+
+ url = '%stenants' % (URL)
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ enabled="true" id="%s"> \
+ <description>A description...</description> \
+ </tenant>' % self.tenant
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",
+ "X-Auth-Token": self.disabled_token,
+ "ACCEPT": "application/xml"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+ def test_tenant_create_invalid_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ url = '%stenants' % (URL)
+ body = '{"tenant": { "id": "%s", \
+ "description": "A description ...", "enabled"\
+ :true } }' % self.tenant
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": 'nonexsitingtoken'})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+ def test_tenant_create_invalid_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ content = etree.fromstring(content)
+ if int(resp['status']) == 200:
+ self.tenant = content.get('id')
+
+ url = '%stenants' % (URL)
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ enabled="true" id="%s"> \
+ <description>A description...</description> \
+ </tenant>' % self.tenant
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": 'nonexsitingtoken',
+ "ACCEPT": "application/xml"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+
+class get_tenants_test(tenant_test):
+
+ def test_get_tenants(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants' % (URL)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='{}',\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+
+ def test_get_tenants_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants' % (URL)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+
+ def test_get_tenants_forbidden_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants' % (URL)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='{}',\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+ def test_get_tenants_forbidden_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants' % (URL)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+ def test_get_tenants_exp_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants' % (URL)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='{}',\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.exp_auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+ def test_get_tenants_exp_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants' % (URL)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.exp_auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+
+class get_tenant_test(tenant_test):
+
+ def test_get_tenant(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/%s' % (URL, self.tenant)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='{}',\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+
+ def test_get_tenant_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/%s' % (URL, self.tenant)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+
+ def test_get_tenant_bad(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/%s' % (URL, 'tenant_bad')
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='{',\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+
+ def test_get_tenant_bad_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/%s' % (URL, 'tenant_bad')
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='{',\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+
+ def test_get_tenant_not_found(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/NonexistingID' % (URL)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='{}',\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+
+ def test_get_tenant_not_found_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/NonexistingID' % (URL)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+
+
+class update_tenant_test(tenant_test):
+
+ def test_update_tenant(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/%s' % (URL, self.tenant)
+ data = '{"tenant": { "description": "A NEW description..." ,\
+ "enabled":true }}'
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "PUT", body=data,\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.auth_token})
+ body = json.loads(content)
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+ self.assertEqual(int(self.tenant), int(body['tenant']['id']))
+ self.assertEqual('A NEW description...', \
+ body['tenant']['description'])
+
+ def test_update_tenant_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant_xml(self.tenant, str(self.auth_token))
+ url = '%stenants/%s' % (URL, self.tenant)
+ data = '<?xml version="1.0" encoding="UTF-8"?> \
+ <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ enabled="true"> \
+ <description>A NEW description...</description> \
+ </tenant>'
+
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "PUT", body=data,\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+ body = etree.fromstring(content)
+ desc = body.find("{http://docs.openstack.org/idm/api/v1.0}description")
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+ self.assertEqual(int(self.tenant), int(body.get('id')))
+ self.assertEqual('A NEW description...', \
+ desc.text)
+
+ def test_update_tenant_bad(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/%s' % (URL, self.tenant)
+ data = '{"tenant": { "description_bad": "A NEW description...",\
+ "enabled":true }}'
+ #test for Content-Type = application/json
+
+ resp, content = h.request(url, "PUT", body=data,\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(400, int(resp['status']))
+
+ def test_update_tenant_bad_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/%s' % (URL, self.tenant)
+ data = '<?xml version="1.0" encoding="UTF-8"?> \
+ <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ enabled="true"> \
+ <description_bad>A NEW description...</description> \
+ </tenant>'
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "PUT", body=data,\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(400, int(resp['status']))
+
+ def test_update_tenant_not_found(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/NonexistingID' % (URL)
+ data = '{"tenant": { "description": "A NEW description...",\
+ "enabled":true }}'
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body=data,\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+
+ def test_update_tenant_not_found_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/NonexistingID' % (URL)
+ data = '<?xml version="1.0" encoding="UTF-8"?> \
+ <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ enabled="true"> \
+ <description_bad>A NEW description...</description> \
+ </tenant>'
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body=data,\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+
+
+class delete_tenant_test(tenant_test):
+
+ def test_delete_tenant_not_found(self):
+ #resp,content=create_tenant("test_tenant_delete", str(self.auth_token))
+ resp, content = delete_tenant("test_tenant_delete111", \
+ str(self.auth_token))
+ self.assertEqual(404, int(resp['status']))
+
+ def test_delete_tenant_not_found_xml(self):
+ #resp,content=create_tenant("test_tenant_delete", str(self.auth_token))
+ resp, content = delete_tenant_xml("test_tenant_delete111", \
+ str(self.auth_token))
+ self.assertEqual(404, int(resp['status']))
+
+ def test_delete_tenant(self):
+ resp, content = create_tenant("test_tenant_delete", \
+ str(self.auth_token))
+ resp, content = delete_tenant("test_tenant_delete", \
+ str(self.auth_token))
+ self.assertEqual(204, int(resp['status']))
+
+ def test_delete_tenant_xml(self):
+ resp, content = create_tenant_xml("test_tenant_delete", \
+ str(self.auth_token))
+ resp, content = delete_tenant_xml("test_tenant_delete", \
+ str(self.auth_token))
+ self.assertEqual(204, int(resp['status']))
+
+
+<<<<<<< HEAD
+
+
+class tenant_group_test(unittest.TestCase):
+
+ def setUp(self):
+ self.token = get_token('joeuser', 'secrete', 'token')
+ self.tenant = get_tenant()
+ self.user = get_user()
+ self.userdisabled = get_userdisabled()
+ self.auth_token = get_auth_token()
+ self.exp_auth_token = get_exp_auth_token()
+ self.disabled_token = get_disabled_token()
+ self.tenant_group = 'test_tenant_group'
+
+ def tearDown(self):
+ resp, content = delete_tenant_group('test_tenant_group', \
+ self.tenant, self.auth_token)
+ resp, content = delete_tenant(self.tenant, self.auth_token)
+
+
+class create_tenant_group_test(tenant_group_test):
+
+ def test_tenant_group_create(self):
+ resp, content = delete_tenant('test_tenant', str(self.auth_token))
+ resp, content = create_tenant('test_tenant', str(self.auth_token))
+
+ respG, contentG = delete_tenant_group('test_tenant_group', \
+ 'test_tenant', str(self.auth_token))
+ respG, contentG = create_tenant_group('test_tenant_group', \
+ 'test_tenant', str(self.auth_token))
+
+ self.tenant = 'test_tenant'
+ self.tenant_group = 'test_tenant_group'
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+
+ if int(respG['status']) not in (200, 201):
+
+ self.fail('Failed due to %d' % int(respG['status']))
+
+ def test_tenant_group_create_xml(self):
+ resp, content = delete_tenant_xml('test_tenant', str(self.auth_token))
+ resp, content = create_tenant_xml('test_tenant', str(self.auth_token))
+ respG, contentG = delete_tenant_group_xml('test_tenant_group', \
+ "test_tenant", str(self.auth_token))
+ respG, contentG = create_tenant_group_xml('test_tenant_group', \
+ "test_tenant", str(self.auth_token))
+
+ self.tenant = 'test_tenant'
+ self.tenant_group = 'test_tenant_group'
+ content = etree.fromstring(content)
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+
+ if int(respG['status']) not in (200, 201):
+
+ self.fail('Failed due to %d' % int(respG['status']))
+
+ def test_tenant_group_create_again(self):
+
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+
+ respG, contentG = create_tenant_group('test_tenant_group', \
+ "test_tenant", str(self.auth_token))
+ respG, contentG = create_tenant_group('test_tenant_group', \
+ "test_tenant", str(self.auth_token))
+
+ if int(respG['status']) == 200:
+ self.tenant = content['tenant']['id']
+ self.tenant_group = contentG['group']['id']
+ if int(respG['status']) == 500:
+ self.fail('IDM fault')
+ elif int(respG['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(409, int(respG['status']))
+ if int(respG['status']) == 200:
+ self.tenant = content['tenant']['id']
+ self.tenant_group = contentG['group']['id']
+
+ def test_tenant_group_create_again_xml(self):
+
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+
+ respG, contentG = create_tenant_group_xml('test_tenant_group', \
+ "test_tenant", str(self.auth_token))
+ respG, contentG = create_tenant_group_xml('test_tenant_group', \
+ "test_tenant", str(self.auth_token))
+
+ content = etree.fromstring(content)
+ contentG = etree.fromstring(contentG)
+ if int(respG['status']) == 200:
+ self.tenant = content.get("id")
+ self.tenant_group = contentG.get("id")
+
+ if int(respG['status']) == 500:
+ self.fail('IDM fault')
+ elif int(respG['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(409, int(respG['status']))
+ if int(respG['status']) == 200:
+ self.tenant = content.get("id")
+ self.tenant_group = contentG.get("id")
+
+ def test_tenant_group_create_forbidden_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ respG, contentG = create_tenant_group_xml('test_tenant_group', \
+ "test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ if int(respG['status']) == 200:
+ self.tenant_group = respG['group']['id']
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = {"group": {"id": self.tenant_group,
+ "description": "A description ..."
+ }}
+ resp, content = h.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": self.token})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+ def test_tenant_group_create_forbidden_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ id="%s"> \
+ <description>A description...</description> \
+ </group>' % self.tenant_group
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml", \
+ "X-Auth-Token": self.token,
+ "ACCEPT": "application/xml"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+
+ self.assertEqual(403, int(resp['status']))
+
+ def test_tenant_group_create_expired_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = {"group": {"id": self.tenant_group,
+ "description": "A description ..."
+ }}
+ resp, content = h.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": self.exp_auth_token})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+ def test_tenant_group_create_expired_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ content = etree.fromstring(content)
+ if int(resp['status']) == 200:
+ self.tenant = content.get('id')
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ id="%s"> \
+ <description>A description...</description> \
+ </group>' % self.tenant
+
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml", \
+ "X-Auth-Token": self.exp_auth_token,
+ "ACCEPT": "application/xml"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+ def test_tenant_group_create_missing_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = {"group": {"id": self.tenant_group,
+ "description": "A description ..."}}
+ resp, content = h.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+ def test_tenant_group_create_missing_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ content = etree.fromstring(content)
+ if int(resp['status']) == 200:
+ self.tenant = content.get('id')
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ id="%s"> \
+ <description>A description...</description> \
+ </group>' % self.tenant_group
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",
+ "ACCEPT": "application/xml"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+ def test_tenant_group_create_disabled_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = '{"group": { "id": "%s", \
+ "description": "A description ..." } }' % self.tenant_group
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.disabled_token})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+ def test_tenant_group_create_disabled_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ content = etree.fromstring(content)
+ if int(resp['status']) == 200:
+ self.tenant = content.get('id')
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ id="%s"> \
+ <description>A description...</description> \
+ </group>' % self.tenant_group
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",
+ "X-Auth-Token": self.disabled_token,
+ "ACCEPT": "application/xml"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+ def test_tenant_group_create_invalid_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = '{"group": { "id": "%s", \
+ "description": "A description ..." } }' % self.tenant
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": 'nonexsitingtoken'})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+ def test_tenant_group_create_invalid_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ content = etree.fromstring(content)
+ if int(resp['status']) == 200:
+ self.tenant = content.get('id')
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ id="%s"> \
+ <description>A description...</description> \
+ </group>' % self.tenant_group
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": 'nonexsitingtoken',
+ "ACCEPT": "application/xml"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+
+class get_tenant_groups_test(tenant_group_test):
+
+ def test_get_tenant_groups(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+
+ respG, contentG = create_tenant_group(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+
+ url = '%stenant/%s/groups' % (URL,self.tenant)
+
+ resp, content = h.request(url, "GET", body='{}',\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+
+ def test_get_tenant_groups_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+
+ respG, contentG = create_tenant_group_xml(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+
+ url = '%stenant/%s/groups' % (URL,self.tenant)
+
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+
+ def test_get_tenant_groups_forbidden_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+
+ respG, contentG = create_tenant_group(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups' % (URL,self.tenant)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='{}',\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+ def test_get_tenant_groups_forbidden_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ respG, contentG = create_tenant_group(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups' % (URL,self.tenant)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+ def test_get_tenant_groups_exp_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ respG, contentG = create_tenant_group(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups' % (URL,self.tenant)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='{}',\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.exp_auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+ def test_get_tenant_groups_exp_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ respG, contentG = create_tenant_group(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups' % (URL,self.tenant)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.exp_auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+
+class get_tenant_group_test(tenant_group_test):
+
+ def test_get_tenant_group(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ respG, contentG = create_tenant_group(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='{}',\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+
+ def test_get_tenant_group_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ respG, contentG = create_tenant_group_xml(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+
+ def test_get_tenant_group_bad(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ respG, contentG = create_tenant_group(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups/%s' % (URL,'tenant_bad',self.tenant_group)
+
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='{',\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+
+ def test_get_tenant_group_bad_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ respG, contentG = create_tenant_group(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups/%s' % (URL,'tenant_bad',self.tenant_group)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='{',\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+
+ def test_get_tenant_group_not_found(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ respG, contentG = create_tenant_group(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups/%s' % (URL,self.tenant,'nonexistinggroup')
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='{}',\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+
+ def test_get_tenant_group_not_found_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ respG, contentG = create_tenant_group(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups/%s' % (URL,self.tenant,'nonexistinggroup')
+
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+
+
+class update_tenant_group_test(tenant_group_test):
+
+ def test_update_tenant_group(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ respG, contentG = create_tenant_group(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group)
+
+ data = '{"group": { "id":"%s","description": "A NEW description..." ,\
+ "tenantId":"%s" }}' % (self.tenant_group,self.tenant)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "PUT", body=data,\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.auth_token})
+ body = json.loads(content)
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+ self.assertEqual(self.tenant_group, body['group']['id'])
+ self.assertEqual('A NEW description...', \
+ body['group']['description'])
+
+ def test_update_tenant_group_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant_xml(self.tenant, str(self.auth_token))
+ respG, contentG = create_tenant_group(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups/%s' % (URL, self.tenant ,self.tenant_group)
+ data = '<?xml version="1.0" encoding="UTF-8"?> \
+ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ tenantId="%s" id="%s"> \
+ <description>A NEW description...</description> \
+ </group>' % (self.tenant, self.tenant_group)
+
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "PUT", body=data,\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+
+ body = etree.fromstring(content)
+ desc = body.find("{http://docs.openstack.org/idm/api/v1.0}description")
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+ self.assertEqual(str(self.tenant_group), str(body.get('id')))
+ self.assertEqual('A NEW description...', \
+ desc.text)
+
+ def test_update_tenant_group_bad(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ respG, contentG = create_tenant_group(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group)
+ data = '{"group": { "description_bad": "A NEW description...",\
+ "id":"%s","tenantId":"%s" }}' % (self.tenant_group,self.tenant)
+ #test for Content-Type = application/json
+
+ resp, content = h.request(url, "PUT", body=data,\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(400, int(resp['status']))
+
+ def test_update_tenant_group_bad_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ respG, contentG = create_tenant_group(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group)
+ data = '<?xml version="1.0" encoding="UTF-8"?> \
+ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ tenantId="%s" id="%s"> \
+ <description_bad>A NEW description...</description> \
+ </group>' % (self.tenant, self.tenant_group)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "PUT", body=data,\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(400, int(resp['status']))
+
+ def test_update_tenant_group_not_found(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ respG, contentG = create_tenant_group(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups/NonexistingID' % (URL, self.tenant)
+
+ data = '{"group": { "description": "A NEW description...",\
+ "id":"NonexistingID", "tenantId"="test_tenant" }}'
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body=data,\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+
+ def test_update_tenant_group_not_found_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups/NonexistingID' % (URL, self.tenant)
+ data = '<?xml version="1.0" encoding="UTF-8"?> \
+ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ id="NonexistingID", "tenant_id"="test_tenant"> \
+ <description_bad>A NEW description...</description> \
+ </group>'
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body=data,\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+
+
+class delete_tenant_group_test(tenant_test):
+
+ def test_delete_tenant_group_not_found(self):
+ #resp,content=create_tenant("test_tenant_delete", str(self.auth_token))
+ resp, content = delete_tenant_group("test_tenant_delete111", \
+ "test_tenant", str(self.auth_token))
+ self.assertEqual(404, int(resp['status']))
+
+ def test_delete_tenant_group_not_found_xml(self):
+ #resp,content=create_tenant("test_tenant_delete", str(self.auth_token))
+ resp, content = delete_tenant_group_xml("test_tenant_delete111", \
+ "test_tenant", str(self.auth_token))
+ self.assertEqual(404, int(resp['status']))
+
+ def test_delete_tenant_group(self):
+ resp, content = create_tenant("test_tenant_delete", \
+ str(self.auth_token))
+ respG, contentG = create_tenant_group('test_tenant_group_delete', \
+ "test_tenant_delete", str(self.auth_token))
+ respG, contentG = delete_tenant_group('test_tenant_group_delete', \
+ "test_tenant_delete", str(self.auth_token))
+ resp, content = delete_tenant("test_tenant_delete", \
+ str(self.auth_token))
+ self.assertEqual(204, int(respG['status']))
+
+ def test_delete_tenant_group_xml(self):
+ resp, content = create_tenant_xml("test_tenant_delete", \
+ str(self.auth_token))
+ respG, contentG = create_tenant_group_xml('test_tenant_group_delete', \
+ "test_tenant_delete", str(self.auth_token))
+ respG, contentG = delete_tenant_group_xml('test_tenant_group_delete', \
+ "test_tenant_delete", str(self.auth_token))
+ resp, content = delete_tenant_xml("test_tenant_delete", \
+ str(self.auth_token))
+ self.assertEqual(204, int(respG['status']))
+
+class create_global_group_test(global_group_test):
+
+ def test_global_group_create(self):
+
+ respG, contentG = delete_global_group('test_tenant_group', \
+ str(self.auth_token))
+ respG, contentG = create_global_group(str(self.auth_token))
+ self.group = 'test_tenant_group'
+
+ if int(respG['status']) == 500:
+ self.fail('IDM fault')
+ elif int(respG['status']) == 503:
+ self.fail('Service Not Available')
+ if int(respG['status']) not in (200, 201):
+ self.fail('Failed due to %d' % int(respG['status']))
+
+
+ def test_global_group_create_again(self):
+
+ respG, contentG = create_global_group('test_tenant_group', \
+ str(self.auth_token))
+ respG, contentG = create_global_group('test_tenant_group', \
+ "test_tenant", str(self.auth_token))
+
+ if int(respG['status']) == 200:
+ self.tenant = content['tenant']['id']
+ self.tenant_group = contentG['group']['id']
+ if int(respG['status']) == 500:
+ self.fail('IDM fault')
+ elif int(respG['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(409, int(respG['status']))
+ if int(respG['status']) == 200:
+ self.tenant = content['tenant']['id']
+ self.tenant_group = contentG['group']['id']
+
+
+ def test_tenant_group_create_forbidden_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ respG, contentG = create_tenant_group_xml('test_tenant_group', \
+ "test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ if int(respG['status']) == 200:
+ self.tenant_group = respG['group']['id']
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = {"group": {"id": self.tenant_group,
+ "description": "A description ..."
+ }}
+ resp, content = h.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": self.token})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+
+ def test_tenant_group_create_expired_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = {"group": {"id": self.tenant_group,
+ "description": "A description ..."
+ }}
+ resp, content = h.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": self.exp_auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+ def test_tenant_group_create_missing_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = {"group": {"id": self.tenant_group,
+ "description": "A description ..."}}
+ resp, content = h.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+ def test_tenant_group_create_disabled_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = '{"group": { "id": "%s", \
+ "description": "A description ..." } }' % self.tenant_group
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.disabled_token})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+ def test_tenant_group_create_invalid_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = '{"group": { "id": "%s", \
+ "description": "A description ..." } }' % self.tenant
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": 'nonexsitingtoken'})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+
+
+ def test_tenant_group_create_xml(self):
+ resp, content = delete_tenant_xml('test_tenant', str(self.auth_token))
+ resp, content = create_tenant_xml('test_tenant', str(self.auth_token))
+ respG, contentG = delete_tenant_group_xml('test_tenant_group', \
+ "test_tenant", str(self.auth_token))
+ respG, contentG = create_tenant_group_xml('test_tenant_group', \
+ "test_tenant", str(self.auth_token))
+
+ self.tenant = 'test_tenant'
+ self.tenant_group = 'test_tenant_group'
+ content = etree.fromstring(content)
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+
+ if int(respG['status']) not in (200, 201):
+
+ self.fail('Failed due to %d' % int(respG['status']))
+
+ def test_tenant_group_create_again_xml(self):
+
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+
+ respG, contentG = create_tenant_group_xml('test_tenant_group', \
+ "test_tenant", str(self.auth_token))
+ respG, contentG = create_tenant_group_xml('test_tenant_group', \
+ "test_tenant", str(self.auth_token))
+
+ content = etree.fromstring(content)
+ contentG = etree.fromstring(contentG)
+ if int(respG['status']) == 200:
+ self.tenant = content.get("id")
+ self.tenant_group = contentG.get("id")
+
+ if int(respG['status']) == 500:
+ self.fail('IDM fault')
+ elif int(respG['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(409, int(respG['status']))
+ if int(respG['status']) == 200:
+ self.tenant = content.get("id")
+ self.tenant_group = contentG.get("id")
+
+ def test_tenant_group_create_forbidden_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ id="%s"> \
+ <description>A description...</description> \
+ </group>' % self.tenant_group
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml", \
+ "X-Auth-Token": self.token,
+ "ACCEPT": "application/xml"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+
+ self.assertEqual(403, int(resp['status']))
+
+ def test_tenant_group_create_expired_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ content = etree.fromstring(content)
+ if int(resp['status']) == 200:
+ self.tenant = content.get('id')
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ id="%s"> \
+ <description>A description...</description> \
+ </group>' % self.tenant
+
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml", \
+ "X-Auth-Token": self.exp_auth_token,
+ "ACCEPT": "application/xml"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+ def test_tenant_group_create_missing_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ content = etree.fromstring(content)
+ if int(resp['status']) == 200:
+ self.tenant = content.get('id')
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ id="%s"> \
+ <description>A description...</description> \
+ </group>' % self.tenant_group
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",
+ "ACCEPT": "application/xml"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+ def test_tenant_group_create_disabled_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ content = etree.fromstring(content)
+ if int(resp['status']) == 200:
+ self.tenant = content.get('id')
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ id="%s"> \
+ <description>A description...</description> \
+ </group>' % self.tenant_group
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",
+ "X-Auth-Token": self.disabled_token,
+ "ACCEPT": "application/xml"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+ def test_tenant_group_create_invalid_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ content = etree.fromstring(content)
+ if int(resp['status']) == 200:
+ self.tenant = content.get('id')
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ id="%s"> \
+ <description>A description...</description> \
+ </group>' % self.tenant_group
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": 'nonexsitingtoken',
+ "ACCEPT": "application/xml"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+def setup():
+ pass
+
+
+def teardown():
+ pass
+
+
+if __name__ == '__main__':
+ setup()
+ try:
+ unittest.main()
+ finally:
+ teardown()