diff options
| author | sirish.bitra <sirish.bitra@gmail.com> | 2011-05-09 20:46:36 +0530 |
|---|---|---|
| committer | sirish.bitra <sirish.bitra@gmail.com> | 2011-05-09 20:46:36 +0530 |
| commit | db0df82f26822fe4a5a2295f8b86b598dfaaa96c (patch) | |
| tree | c2289d700e9b7e29c89435288a8bdbd8103b2e84 | |
| parent | 3c53cee318c841e69cb4f81af5e8747e8bb34338 (diff) | |
| download | keystone-db0df82f26822fe4a5a2295f8b86b598dfaaa96c.tar.gz keystone-db0df82f26822fe4a5a2295f8b86b598dfaaa96c.tar.xz keystone-db0df82f26822fe4a5a2295f8b86b598dfaaa96c.zip | |
Modified test cases
| -rwxr-xr-x | keystone/auth_server.py | 172 | ||||
| -rw-r--r-- | keystone/db/sqlalchemy/api.py | 4 | ||||
| -rw-r--r-- | keystone/logic/service.py | 31 | ||||
| -rw-r--r-- | keystone/logic/types/fault.py | 2 | ||||
| -rw-r--r-- | keystone/logic/types/tenant.py | 6 | ||||
| -rw-r--r-- | test/unit/test_identity.py | 1765 |
6 files changed, 901 insertions, 1079 deletions
diff --git a/keystone/auth_server.py b/keystone/auth_server.py index 36349a42..a80373d5 100755 --- a/keystone/auth_server.py +++ b/keystone/auth_server.py @@ -89,37 +89,79 @@ def get_auth_token(req): return auth_token +def wrap_error(func): + @functools.wraps(func) + def check_error(*args, **kwargs): + print '>>>>>>>>>>>>>>>>>>..' + try: + + return func(*args, **kwargs) + + except Exception as err: + if isinstance(err, fault.IDMFault): + return send_error(err.code, kwargs['req'], err) + else: + logging.exception(err) + return send_error(500, kwargs['req'], fault.IDMFault("Unhandled error", str(err))) + return check_error + + def get_normalized_request_content(model, req): """initialize a model from json/xml contents of request body""" - + if req.content_type == "application/xml": + ret = model.from_xml(req.body) elif req.content_type == "application/json": + ret = model.from_json(req.body) else: + raise fault.IDMFault("I don't understand the content type ", code=415) return ret +def send_error(code, req, result): + content = None + resp = Response() + + resp.headers['content-type'] = None + resp.status = code + + if result: + + if is_xml_response(req): + + content = result.to_xml() + resp.headers['content-type'] = "application/xml" + else: + + content = result.to_json() + resp.headers['content-type'] = "application/json" + + resp.content_type_params={'charset' : 'UTF-8'} + resp.unicode_body = content.decode('UTF-8') + + return resp + def send_result(code, req, result): content = None - resp = Response() - resp.content_type = None - + resp.headers['content-type'] = None resp.status = code if code > 399: return resp - + if result: + if is_xml_response(req): content = result.to_xml() - resp.content_type = "application/xml" + resp.headers['content-type'] = "application/xml" else: content = result.to_json() - resp.content_type = "application/json" + resp.headers['content-type'] = "application/json" - resp.charset = 'UTF-8' + resp.content_type_params={'charset' : 'UTF-8'} resp.unicode_body = content.decode('UTF-8') return resp @@ -128,24 +170,28 @@ class StaticFilesController(wsgi.Controller): def __init__(self, options): self.options = options - + + @wrap_error def get_pdf_contract(self, req): resp = Response() return template.static_file(resp, req, "content/idmdevguide.pdf", root=get_app_root(), mimetype="application/pdf") - + + @wrap_error def get_wadl_contract(): resp = Response() return template.static_file(resp, req, "identity.wadl", root=get_app_root(), mimetype="application/vnd.sun.wadl+xml") + @wrap_error def get_xsd_contract(xsd): resp = Response() return template.static_file(resp, req, "/xsd/" + xsd, root=get_app_root(), mimetype="application/xml") - + + @wrap_error def get_xsd_atom_contract(xsd): resp = Response() return template.static_file(resp, req, "/xsd/atom/" + xsd, @@ -156,7 +202,8 @@ class MiscController(wsgi.Controller): def __init__(self, options): self.options = options - + + @wrap_error def get_version_info(self, req): resp = Response() @@ -184,18 +231,26 @@ class AuthController(wsgi.Controller): def __init__(self, options): self.options = options - + self.request = None + + @wrap_error def authenticate(self, req): + self.request = req + creds = get_normalized_request_content(auth.PasswordCredentials, req) return send_result(200, req, service.authenticate(creds)) - + + @wrap_error def validate_token(self, req, token_id): + belongs_to = None if "belongsTo" in req.GET: belongs_to = req.GET["belongsTo"] rval = service.validate_token(get_auth_token(req), token_id, belongs_to) + return send_result(200, req, rval) - + + @wrap_error def delete_token(self, req, token_id): return send_result(204, req, service.revoke_token(get_auth_token(req), token_id)) @@ -204,12 +259,14 @@ class TenantController(wsgi.Controller): def __init__(self, options): self.options = options - + + @wrap_error def create_tenant(self, req): tenant = get_normalized_request_content(tenants.Tenant, req) return send_result(201, req, service.create_tenant(get_auth_token(req), tenant)) - + + @wrap_error def get_tenants(self, req): marker = None if "marker" in req.GET: @@ -227,16 +284,20 @@ class TenantController(wsgi.Controller): tenants = service.get_tenants(get_auth_token(req), marker, limit, url) return send_result(200, req, tenants) - + + + @wrap_error def get_tenant(self, req, tenant_id): tenant = service.get_tenant(get_auth_token(req), tenant_id) return send_result(200, req, tenant) - + + @wrap_error def update_tenant(self, req, tenant_id): tenant = get_normalized_request_content(tenants.Tenant, req) rval = service.update_tenant(get_auth_token(req), tenant_id, tenant) return send_result(200, req, rval) + @wrap_error def delete_tenant(self, req, tenant_id): rval = service.delete_tenant(get_auth_token(req), tenant_id) return send_result(204, req, rval) @@ -244,13 +305,13 @@ class TenantController(wsgi.Controller): # Tenant Group Methods - + @wrap_error def create_tenant_group(self, req, tenant_id): group = get_normalized_request_content(tenants.Group, req) return send_result(201, req, service.create_tenant_group(get_auth_token(req), \ tenant_id, group)) - + @wrap_error def get_tenant_groups(self, req, tenant_id): marker = None if "marker" in req.GET: @@ -270,32 +331,38 @@ class TenantController(wsgi.Controller): tenant_id, marker, limit, url) return send_result(200, req, groups) + @wrap_error def get_tenant_group(self, req, tenant_id, group_id): tenant = service.get_tenant_group(get_auth_token(req), tenant_id, group_id) return send_result(200, req, tenant) - + @wrap_error def update_tenant_group(self, req, tenant_id, group_id): group = get_normalized_request_content(tenants.Group, req) rval = service.update_tenant_group(get_auth_token(req),\ tenant_id, group_id, group) return send_result(200, req, rval) - + + @wrap_error def delete_tenant_group(self, req, tenant_id, group_id): rval = service.delete_tenant_group(get_auth_token(req), tenant_id, group_id) return send_result(204, req, rval) + @wrap_error def add_user_tenant_group(self, req, tenant_id, group_id, user_id): # TBD # IDMDevguide clarification needed on this property return None - + + @wrap_error def delete_user_tenant_group(self, req, tenant_id, group_id, user_id): # TBD # IDMDevguide clarification needed on this property return None + + @wrap_error def get_user_tenant_group(self, req, tenant_id, group_id, user_id): # TBD # IDMDevguide clarification needed on this property @@ -306,11 +373,13 @@ class UserController(wsgi.Controller): def __init__(self, options): self.options = options + @wrap_error def create_user(self, req, tenant_id): user = get_normalized_request_content(users.User, req) return send_result(201, req, service.create_user(get_auth_token(req), tenant_id, user)) + @wrap_error def get_tenant_users(self, req, tenant_id): marker = None if "marker" in req.GET: @@ -325,7 +394,8 @@ class UserController(wsgi.Controller): req.environ['PATH_INFO']) users = service.get_tenant_users(get_auth_token(req), tenant_id, marker, limit, url) return send_result(200, req, users) - + + @wrap_error def get_user_groups(self, req, tenant_id, user_id): marker = None if "marker" in req.GET: @@ -335,35 +405,39 @@ class UserController(wsgi.Controller): limit = req.GET["limit"] else: limit = 10 - - url = '%s://%s:%s%s' % (req.environ['wsgi.url_scheme'],\ - req.environ.get("SERVER_NAME"),\ - req.environ.get("SERVER_PORT"),\ + url = '%s://%s:%s%s' % (req.environ['wsgi.url_scheme'], + req.environ.get("SERVER_NAME"), + req.environ.get("SERVER_PORT"), req.environ['PATH_INFO']) - groups = service.get_user_groups(get_auth_token(),\ + groups = service.get_user_groups(get_auth_token(), tenant_id,user_id, marker, limit,url) return send_result(200, groups) - + + @wrap_error def get_user(self, req, tenant_id, user_id): user = service.get_user(get_auth_token(req), tenant_id, user_id) return send_result(200, req, user) - + + @wrap_error def update_user(self, req, user_id, tenant_id): user = get_normalized_request_content(users.User_Update, req) rval = service.update_user(get_auth_token(req), user_id, user, tenant_id) return send_result(200, req, rval) - + + @wrap_error def delete_user(self, req, user_id, tenant_id): rval = service.delete_user(get_auth_token(req), user_id, tenant_id) return send_result(204, req, rval) - + + @wrap_error def set_user_password(self, req, user_id, tenant_id): user = get_normalized_request_content(users.User_Update, req) rval = service.set_user_password(get_auth_token(req), user_id, user, tenant_id) return send_result(204, req, rval) # To be checked with Abdul not finished yet + @wrap_error def set_user_enabled(self, req, user_id, tenant_id): user = get_normalized_request_content(users.User_Update, req) rval = service.enable_disable_user(get_auth_token(req), user_id, user, tenant_id) @@ -372,15 +446,18 @@ class UserController(wsgi.Controller): class GroupsController(wsgi.Controller): - + + def __init__(self, options): self.options = options + @wrap_error def create_group(self, req): group = get_normalized_request_content(tenants.Group, req) return send_result(201, req, service.create_global_group(get_auth_token(req), group)) + @wrap_error def get_groups(self, req): marker = None if "marker" in req.GET: @@ -398,21 +475,25 @@ class GroupsController(wsgi.Controller): groups = service.get_global_groups(get_auth_token(req), marker, limit, url) return send_result(200, req, groups) - + + @wrap_error def get_group(self, req, group_id): tenant = service.get_global_group(get_auth_token(req), group_id) return send_result(200, req, tenant) - + + @wrap_error def update_group(self, req, group_id): group = get_normalized_request_content(tenants.Group, req) rval = service.update_global_group(get_auth_token(req), group_id, group) return send_result(200, req, rval) - + + @wrap_error def delete_group(self, req, group_id): rval = service.delete_global_group(get_auth_token(req), group_id) return send_result(204, req, rval) - + + @wrap_error def get_users_group(self, req, group_id): marker = None if "marker" in req.GET: @@ -432,11 +513,12 @@ class GroupsController(wsgi.Controller): group_id, marker, limit, url) return send_result(200, req, users) - + @wrap_error def add_user_group(self, req, group_id, user_id): return send_result(201, req, service.add_user_global_group(get_auth_token(req), group_id, user_id)) + @wrap_error def delete_user_group(self, req, group_id, user_id): return send_result(204, req, service.delete_user_global_group(get_auth_token(req), @@ -472,15 +554,15 @@ class KeystoneAPI(wsgi.Router): # Tenant Group Operations - mapper.connect("/v1.0/tenants/{tenant_id}/groups", controller=tenant_controller, + mapper.connect("/v1.0/tenant/{tenant_id}/groups", controller=tenant_controller, action="create_tenant_group", conditions=dict(method=["POST"])) - mapper.connect("/v1.0/tenants/{tenant_id}/groups", controller=tenant_controller, + mapper.connect("/v1.0/tenant/{tenant_id}/groups", controller=tenant_controller, action="get_tenant_groups", conditions=dict(method=["GET"])) - mapper.connect("/v1.0/tenants/{tenant_id}/groups/{group_id}", controller=tenant_controller, + mapper.connect("/v1.0/tenant/{tenant_id}/groups/{group_id}", controller=tenant_controller, action="get_tenant_group", conditions=dict(method=["GET"])) - mapper.connect("/v1.0/tenants/{tenant_id}/groups/{group_id}", controller=tenant_controller, + mapper.connect("/v1.0/tenant/{tenant_id}/groups/{group_id}", controller=tenant_controller, action="update_tenant_group", conditions=dict(method=["PUT"])) - mapper.connect("/v1.0/tenants/{tenant_id}/groups/{group_id}", controller=tenant_controller, + mapper.connect("/v1.0/tenant/{tenant_id}/groups/{group_id}", controller=tenant_controller, action="delete_tenant_group", conditions=dict(method=["DELETE"])) # User Operations diff --git a/keystone/db/sqlalchemy/api.py b/keystone/db/sqlalchemy/api.py index 049c1bac..00e5b0fa 100644 --- a/keystone/db/sqlalchemy/api.py +++ b/keystone/db/sqlalchemy/api.py @@ -535,12 +535,12 @@ def users_get_by_tenant_get_page_markers(tenant_id, marker, limit, session=None) order_by(user.id.desc()).first() if marker is None: marker = first.id - next = session.query(user, uta).join((uta, uta.user_id == user.id)).\ + next, nextuta = session.query(user, uta).join((uta, uta.user_id == user.id)).\ filter(uta.tenant_id == tenant_id).\ filter("id >= :marker").params( marker='%s' % marker).order_by( user.id).limit(int(limit) + 1).all() - prev = session.query(user, uta).join((uta, uta.user_id == user.id)).\ + prev, prevuta = session.query(user, uta).join((uta, uta.user_id == user.id)).\ filter(uta.tenant_id == tenant_id).\ filter("id < :marker").params( marker='%s' % marker).order_by( diff --git a/keystone/logic/service.py b/keystone/logic/service.py index 9b38b76c..6ff6d560 100644 --- a/keystone/logic/service.py +++ b/keystone/logic/service.py @@ -354,19 +354,36 @@ class IDMService(object): def validate_token(self, admin_token, token_id, belongs_to=None): self.__validate_token(admin_token) + if not token_id: + raise fault.UnauthorizedFault("Missing token") + (token, user) = self.__get_dauth_data(token_id) - (dtoken, duser) = self.__get_dauth_data(token_id) + if not token: + raise fault.ItemNotFoundFault("Bad token, please reauthenticate") + if token.expires < datetime.now(): + raise fault.UnauthorizedFault("Token expired, please renew") + if not user.enabled: + raise fault.UserDisabledFault("The user %s has been disabled!" + % user.id) +# if admin: +# for ug in user.groups: +# if ug.group_id == "Admin": +# return (token, user) +# raise fault.ForbiddenFault("You are not authorized " +# "to make this call") + return self.__get_auth_data(token, user) + """(dtoken, duser) = self.__get_dauth_data(token_id) if not dtoken: - raise fault.ItemNotFoundFault("Token not found") + raise fault.UnauthorizedFault("Token not found") if dtoken.expires < datetime.now(): - raise fault.ItemNotFoundFault("Token not found") + raise fault.UnauthorizedFault("Token expired") if belongs_to != None and dtoken.tenant_id != belongs_to: - raise fault.ItemNotFoundFault("Token not found") + raise fault.UnauthorizedFault("Token not found") - return self.__get_auth_data(dtoken, duser) + return self.__get_auth_data(dtoken, duser)""" def revoke_token(self, admin_token, token_id): self.__validate_token(admin_token) @@ -922,7 +939,7 @@ class IDMService(object): ts = [] dusergroups = db_api.groups_get_by_user_get_page(user_id, marker, limit) - print dusergroups + for dusergroup, dusergroupAsso in dusergroups: @@ -1157,7 +1174,7 @@ class IDMService(object): (token, user) = self.__get_dauth_data(token_id) if not token: - raise fault.UnauthorizedFault("Bad token, please reauthenticate") + raise fault.ItemNotFoundFault("Bad token, please reauthenticate") if token.expires < datetime.now(): raise fault.UnauthorizedFault("Token expired, please renew") if not user.enabled: diff --git a/keystone/logic/types/fault.py b/keystone/logic/types/fault.py index db8ef90d..e35ca50a 100644 --- a/keystone/logic/types/fault.py +++ b/keystone/logic/types/fault.py @@ -73,7 +73,7 @@ class BadRequestFault(IDMFault): class UnauthorizedFault(IDMFault): "User is unauthorized" - + def __init__(self, msg, details=None, code=401): super(UnauthorizedFault, self).__init__(msg, details, code) self.key = "unauthorized" diff --git a/keystone/logic/types/tenant.py b/keystone/logic/types/tenant.py index 876a45d8..0e13150b 100644 --- a/keystone/logic/types/tenant.py +++ b/keystone/logic/types/tenant.py @@ -128,7 +128,7 @@ class Tenants(object): class Group(object): "Describes a group in the auth system" - def __init__(self, group_id, description, tenant_id=''): + def __init__(self, group_id, description, tenant_id=None): self.description = description self.group_id = group_id if tenant_id: @@ -147,6 +147,8 @@ class Group(object): group_id = root.get("id") if root.get("tenantId"): tenant_id = root.get("tenantId") + else: + tenant_id = None desc = root.find("{http://docs.openstack.org/idm/api/v1.0}" "description") if desc == None: @@ -179,7 +181,7 @@ class Group(object): description = group["description"] return Group(group_id, description, tenantId) except (ValueError, TypeError) as e: - raise fault.BadRequestFault("Cannot parse Group", str(e)) + raise fault.BadRequestFault("Cannot parse Group.", str(e)) def to_dom(self): dom = etree.Element("group", diff --git a/test/unit/test_identity.py b/test/unit/test_identity.py index 09170a79..1421d00a 100644 --- a/test/unit/test_identity.py +++ b/test/unit/test_identity.py @@ -3,7 +3,7 @@ import sys # Need to access identity module sys.path.append(os.path.abspath(os.path.join(os.path.abspath(__file__), '..', '..', '..', '..', 'keystone'))) -from keystone import auth_server + import unittest from webtest import TestApp import httplib2 @@ -55,7 +55,7 @@ def create_tenant(tenantid, auth_token): def create_tenant_group(groupid, tenantid, auth_token): h = httplib2.Http(".cache") - url = '%stenant/%s/groups' % (URL,tenantid) + url = '%stenant/%s/groups' % (URL, tenantid) body = {"group": {"id": groupid, "description": "A description ..." }} @@ -83,11 +83,11 @@ def delete_tenant_group(groupid, tenantid, auth_token): return (resp, content) -def create_global_group(auth_token): +def create_global_group(groupid, auth_token): h = httplib2.Http(".cache") - url = '%s/groups' % (URL) - body = {"group": {"id": 'Admin', + url = '%sgroups' % (URL) + body ={"group": {"id": groupid, "description": "A description ..." }} resp, content = h.request(url, "POST", body=json.dumps(body), @@ -98,7 +98,7 @@ def create_global_group(auth_token): def delete_global_group(groupid, auth_token): h = httplib2.Http(".cache") - url = '%s/groups/%s' % (URL, groupid) + url = '%sgroups/%s' % (URL, groupid) resp, content = h.request(url, "DELETE", body='{}',\ headers={"Content-Type": "application/json",\ "X-Auth-Token": auth_token}) @@ -153,7 +153,7 @@ def create_tenant_xml(tenantid, auth_token): def create_tenant_group_xml(groupid, tenantid, auth_token): h = httplib2.Http(".cache") - url = '%stenant/%s/groups' % (URL,tenantid) + url = '%stenant/%s/groups' % (URL, tenantid) body = '<?xml version="1.0" encoding="UTF-8"?> \ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ id="%s"> \ @@ -188,7 +188,7 @@ def delete_tenant_group_xml(groupid, tenantid, auth_token): def create_global_group_xml(auth_token): h = httplib2.Http(".cache") - url = '%s/groups' % (URL) + url = '%sgroups' % (URL) body = '<?xml version="1.0" encoding="UTF-8"?> \ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ id="Admin"> \ @@ -203,7 +203,7 @@ def create_global_group_xml(auth_token): def delete_global_group_xml(groupid, auth_token): h = httplib2.Http(".cache") - url = '%s/groups/%s' % (URL, groupid) + url = '%sgroups/%s' % (URL, groupid) resp, content = h.request(url, "DELETE", body='',\ headers={"Content-Type": "application/xml",\ "X-Auth-Token": auth_token, @@ -234,8 +234,10 @@ def get_exp_auth_token(): def get_disabled_token(): return '999888777' + def content_type(resp): - return resp['content-type'].split(';')[0] + return resp['content-type'].split(';')[0] + class identity_test(unittest.TestCase): @@ -246,8 +248,8 @@ class identity_test(unittest.TestCase): h = httplib2.Http(".cache") url = URL resp, content = h.request(url, "GET", body="", - headers={"Content-Type": "application/json"}) - + headers={"Content-Type":\ + "application/json"}) self.assertEqual(200, int(resp['status'])) self.assertEqual('application/json', content_type(resp)) @@ -273,14 +275,11 @@ class authorize_test(identity_test): self.exp_auth_token = get_exp_auth_token() self.disabled_token = get_disabled_token() - - def tearDown(self): delete_token(self.token, self.auth_token) def test_a_authorize(self): resp, content = get_token('joeuser', 'secrete') - self.assertEqual(200, int(resp['status'])) self.assertEqual('application/json', content_type(resp)) @@ -289,11 +288,11 @@ class authorize_test(identity_test): self.assertEqual(200, int(resp['status'])) self.assertEqual('application/xml', content_type(resp)) - def test_a_authorize_user_disaabled(self): + def test_a_authorize_user_disabled(self): h = httplib2.Http(".cache") url = '%stoken' % URL body = {"passwordCredentials": {"username": "disabled", - "password": "self.tenant_group='test_tenant_group'secrete"}} + "password": "secrete"}} resp, content = h.request(url, "POST", body=json.dumps(body), headers={"Content-Type": "application/json"}) content = json.loads(content) @@ -302,11 +301,11 @@ class authorize_test(identity_test): elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(403, int(resp['status'])) + self.assertEqual('application/json', content_type(resp)) - def test_a_authorize_user_disaabled_xml(self): + def test_a_authorize_user_disabled_xml(self): h = httplib2.Http(".cache") url = '%stoken' % URL - body = '<?xml version="1.0" encoding="UTF-8"?> \ <passwordCredentials \ xmlns="http://docs.openstack.org/idm/api/v1.0" \ @@ -321,6 +320,7 @@ class authorize_test(identity_test): elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(403, int(resp['status'])) + self.assertEqual('application/xml', content_type(resp)) def test_a_authorize_user_wrong(self): h = httplib2.Http(".cache") @@ -335,6 +335,7 @@ class authorize_test(identity_test): elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(400, int(resp['status'])) + self.assertEqual('application/json', content_type(resp)) def test_a_authorize_user_wrong_xml(self): h = httplib2.Http(".cache") @@ -353,6 +354,7 @@ class authorize_test(identity_test): elif int(resp['status']) == 503: self.fail('Service Not Available') self.assertEqual(400, int(resp['status'])) + self.assertEqual('application/xml', content_type(resp)) class validate_token(authorize_test): @@ -458,8 +460,7 @@ class tenant_test(unittest.TestCase): def tearDown(self): resp, content = delete_tenant(self.tenant, self.auth_token) -""" "passwordCredentials" : {"username" : "joeuser","password": "secrete","tenantId": "1234"} -""" + class create_tenant_test(tenant_test): @@ -716,7 +717,7 @@ class create_tenant_test(tenant_test): self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') - self.assertEqual(401, int(resp['status'])) + self.assertEqual(404, int(resp['status'])) def test_tenant_create_invalid_token_xml(self): h = httplib2.Http(".cache") @@ -740,7 +741,7 @@ class create_tenant_test(tenant_test): self.fail('IDM fault') elif int(resp['status']) == 503: self.fail('Service Not Available') - self.assertEqual(401, int(resp['status'])) + self.assertEqual(404, int(resp['status'])) class get_tenants_test(tenant_test): @@ -1074,1042 +1075,762 @@ class delete_tenant_test(tenant_test): self.assertEqual(204, int(resp['status'])) - - class tenant_group_test(unittest.TestCase): - def setUp(self): - self.token = get_token('joeuser', 'secrete', 'token') - self.tenant = get_tenant() - self.user = get_user() - self.userdisabled = get_userdisabled() - self.auth_token = get_auth_token() - self.exp_auth_token = get_exp_auth_token() - self.disabled_token = get_disabled_token() - self.tenant_group = 'test_tenant_group' - - def tearDown(self): - resp, content = delete_tenant_group('test_tenant_group', \ + def setUp(self): + self.token = get_token('joeuser', 'secrete', 'token') + self.tenant = get_tenant() + self.user = get_user() + self.userdisabled = get_userdisabled() + self.auth_token = get_auth_token() + self.exp_auth_token = get_exp_auth_token() + self.disabled_token = get_disabled_token() + self.tenant_group = 'test_tenant_group' + + def tearDown(self): + resp, content = delete_tenant_group('test_tenant_group', \ self.tenant, self.auth_token) - resp, content = delete_tenant(self.tenant, self.auth_token) + resp, content = delete_tenant(self.tenant, self.auth_token) class create_tenant_group_test(tenant_group_test): - def test_tenant_group_create(self): - resp, content = delete_tenant('test_tenant', str(self.auth_token)) - resp, content = create_tenant('test_tenant', str(self.auth_token)) - - respG, contentG = delete_tenant_group('test_tenant_group', \ - 'test_tenant', str(self.auth_token)) - respG, contentG = create_tenant_group('test_tenant_group', \ - 'test_tenant', str(self.auth_token)) - - self.tenant = 'test_tenant' - self.tenant_group = 'test_tenant_group' - - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - - if int(respG['status']) not in (200, 201): - self.fail('Failed due to %d' % int(respG['status'])) - - def test_tenant_group_create_xml(self): - resp, content = delete_tenant_xml('test_tenant', str(self.auth_token)) - resp, content = create_tenant_xml('test_tenant', str(self.auth_token)) - respG, contentG = delete_tenant_group_xml('test_tenant_group', \ - "test_tenant", str(self.auth_token)) - respG, contentG = create_tenant_group_xml('test_tenant_group', \ - "test_tenant", str(self.auth_token)) - - self.tenant = 'test_tenant' - self.tenant_group = 'test_tenant_group' - content = etree.fromstring(content) - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - - if int(respG['status']) not in (200, 201): - - self.fail('Failed due to %d' % int(respG['status'])) - - def test_tenant_group_create_again(self): - - resp, content = create_tenant("test_tenant", str(self.auth_token)) - - respG, contentG = create_tenant_group('test_tenant_group', \ - "test_tenant", str(self.auth_token)) - respG, contentG = create_tenant_group('test_tenant_group', \ - "test_tenant", str(self.auth_token)) - - if int(respG['status']) == 200: - self.tenant = content['tenant']['id'] - self.tenant_group = contentG['group']['id'] - if int(respG['status']) == 500: - self.fail('IDM fault') - elif int(respG['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(409, int(respG['status'])) - if int(respG['status']) == 200: - self.tenant = content['tenant']['id'] - self.tenant_group = contentG['group']['id'] - - def test_tenant_group_create_again_xml(self): - - resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) - - respG, contentG = create_tenant_group_xml('test_tenant_group', \ - "test_tenant", str(self.auth_token)) - respG, contentG = create_tenant_group_xml('test_tenant_group', \ - "test_tenant", str(self.auth_token)) - - content = etree.fromstring(content) - contentG = etree.fromstring(contentG) - if int(respG['status']) == 200: - self.tenant = content.get("id") - self.tenant_group = contentG.get("id") - - if int(respG['status']) == 500: - self.fail('IDM fault') - elif int(respG['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(409, int(respG['status'])) - if int(respG['status']) == 200: - self.tenant = content.get("id") - self.tenant_group = contentG.get("id") - - def test_tenant_group_create_forbidden_token(self): - h = httplib2.Http(".cache") - resp, content = create_tenant("test_tenant", str(self.auth_token)) - respG, contentG = create_tenant_group_xml('test_tenant_group', \ - "test_tenant", str(self.auth_token)) - if int(resp['status']) == 200: - self.tenant = content['tenant']['id'] - - if int(respG['status']) == 200: - self.tenant_group = respG['group']['id'] - - url = '%stenant/%s/groups' % (URL, self.tenant) - body = {"group": {"id": self.tenant_group, - "description": "A description ..." - }} - resp, content = h.request(url, "POST", body=json.dumps(body), - headers={"Content-Type": "application/json", - "X-Auth-Token": self.token}) - - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(403, int(resp['status'])) - - def test_tenant_group_create_forbidden_token_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant("test_tenant", str(self.auth_token)) - if int(resp['status']) == 200: - self.tenant = content['tenant']['id'] - - url = '%stenant/%s/groups' % (URL, self.tenant) - body = '<?xml version="1.0" encoding="UTF-8"?> \ - <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ - id="%s"> \ - <description>A description...</description> \ - </group>' % self.tenant_group - resp, content = h.request(url, "POST", body=body,\ - headers={"Content-Type": "application/xml", \ - "X-Auth-Token": self.token, - "ACCEPT": "application/xml"}) - - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - - self.assertEqual(403, int(resp['status'])) - - def test_tenant_group_create_expired_token(self): - h = httplib2.Http(".cache") - resp, content = create_tenant("test_tenant", str(self.auth_token)) - if int(resp['status']) == 200: - self.tenant = content['tenant']['id'] - - url = '%stenant/%s/groups' % (URL, self.tenant) - body = {"group": {"id": self.tenant_group, - "description": "A description ..." - }} - resp, content = h.request(url, "POST", body=json.dumps(body), - headers={"Content-Type": "application/json", - "X-Auth-Token": self.exp_auth_token}) - - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(401, int(resp['status'])) - - def test_tenant_group_create_expired_token_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) - content = etree.fromstring(content) - if int(resp['status']) == 200: - self.tenant = content.get('id') - - url = '%stenant/%s/groups' % (URL, self.tenant) - body = '<?xml version="1.0" encoding="UTF-8"?> \ - <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ - id="%s"> \ - <description>A description...</description> \ - </group>' % self.tenant - - resp, content = h.request(url, "POST", body=body,\ - headers={"Content-Type": "application/xml", \ - "X-Auth-Token": self.exp_auth_token, - "ACCEPT": "application/xml"}) - - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(401, int(resp['status'])) - - def test_tenant_group_create_missing_token(self): - h = httplib2.Http(".cache") - resp, content = create_tenant("test_tenant", str(self.auth_token)) - if int(resp['status']) == 200: - self.tenant = content['tenant']['id'] - - url = '%stenant/%s/groups' % (URL, self.tenant) - body = {"group": {"id": self.tenant_group, - "description": "A description ..."}} - resp, content = h.request(url, "POST", body=json.dumps(body), - headers={"Content-Type": "application/json"}) - - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(401, int(resp['status'])) - - def test_tenant_group_create_missing_token_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) - content = etree.fromstring(content) - if int(resp['status']) == 200: - self.tenant = content.get('id') - - url = '%stenant/%s/groups' % (URL, self.tenant) - - body = '<?xml version="1.0" encoding="UTF-8"?> \ - <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ - id="%s"> \ - <description>A description...</description> \ - </group>' % self.tenant_group - resp, content = h.request(url, "POST", body=body,\ - headers={"Content-Type": "application/xml", - "ACCEPT": "application/xml"}) - - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(401, int(resp['status'])) - - def test_tenant_group_create_disabled_token(self): - h = httplib2.Http(".cache") - resp, content = create_tenant("test_tenant", str(self.auth_token)) - if int(resp['status']) == 200: - self.tenant = content['tenant']['id'] - - url = '%stenant/%s/groups' % (URL, self.tenant) - body = '{"group": { "id": "%s", \ - "description": "A description ..." } }' % self.tenant_group - resp, content = h.request(url, "POST", body=body,\ - headers={"Content-Type": "application/json",\ - "X-Auth-Token": self.disabled_token}) - - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(403, int(resp['status'])) - - def test_tenant_group_create_disabled_token_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) - content = etree.fromstring(content) - if int(resp['status']) == 200: - self.tenant = content.get('id') - - url = '%stenant/%s/groups' % (URL, self.tenant) - body = '<?xml version="1.0" encoding="UTF-8"?> \ - <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ - id="%s"> \ - <description>A description...</description> \ - </group>' % self.tenant_group - resp, content = h.request(url, "POST", body=body,\ - headers={"Content-Type": "application/xml", - "X-Auth-Token": self.disabled_token, - "ACCEPT": "application/xml"}) - - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(403, int(resp['status'])) - - def test_tenant_group_create_invalid_token(self): - h = httplib2.Http(".cache") - resp, content = create_tenant("test_tenant", str(self.auth_token)) - if int(resp['status']) == 200: - self.tenant = content['tenant']['id'] - - url = '%stenant/%s/groups' % (URL, self.tenant) - body = '{"group": { "id": "%s", \ - "description": "A description ..." } }' % self.tenant - resp, content = h.request(url, "POST", body=body,\ - headers={"Content-Type": "application/json",\ - "X-Auth-Token": 'nonexsitingtoken'}) - - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(401, int(resp['status'])) - - def test_tenant_group_create_invalid_token_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) - content = etree.fromstring(content) - if int(resp['status']) == 200: - self.tenant = content.get('id') - - url = '%stenant/%s/groups' % (URL, self.tenant) - body = '<?xml version="1.0" encoding="UTF-8"?> \ - <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ - id="%s"> \ - <description>A description...</description> \ - </group>' % self.tenant_group - resp, content = h.request(url, "POST", body=body,\ - headers={"Content-Type": "application/xml",\ - "X-Auth-Token": 'nonexsitingtoken', - "ACCEPT": "application/xml"}) - - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(401, int(resp['status'])) + def test_tenant_group_creates(self): + resp, content = delete_tenant('test_tenant', str(self.auth_token)) + resp, content = create_tenant('test_tenant', str(self.auth_token)) + respG, contentG = delete_tenant_group('test_tenant_group', \ + 'test_tenant', str(self.auth_token)) + respG, contentG = create_tenant_group('test_tenant_group', \ + 'test_tenant', str(self.auth_token)) + self.tenant = 'test_tenant' + self.tenant_group = 'test_tenant_group' + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + if int(respG['status']) not in (200, 201): + self.fail('Failed due to %d' % int(respG['status'])) + + def test_tenant_group_create_xml(self): + resp, content = delete_tenant_xml('test_tenant', str(self.auth_token)) + resp, content = create_tenant_xml('test_tenant', str(self.auth_token)) + respG, contentG = delete_tenant_group_xml('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + respG, contentG = create_tenant_group_xml('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + self.tenant = 'test_tenant' + self.tenant_group = 'test_tenant_group' + content = etree.fromstring(content) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + if int(respG['status']) not in (200, 201): + self.fail('Failed due to %d' % int(respG['status'])) + + def test_tenant_group_create_again(self): + resp, content = create_tenant("test_tenant", str(self.auth_token)) + respG, contentG = create_tenant_group('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + respG, contentG = create_tenant_group('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + if int(respG['status']) == 200: + self.tenant = content['tenant']['id'] + self.tenant_group = contentG['group']['id'] + if int(respG['status']) == 500: + self.fail('IDM fault') + elif int(respG['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(409, int(respG['status'])) + if int(respG['status']) == 200: + self.tenant = content['tenant']['id'] + self.tenant_group = contentG['group']['id'] + + def test_tenant_group_create_again_xml(self): + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + respG, contentG = create_tenant_group_xml('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + respG, contentG = create_tenant_group_xml('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + contentG = etree.fromstring(contentG) + if int(respG['status']) == 200: + self.tenant = content.get("id") + self.tenant_group = contentG.get("id") + if int(respG['status']) == 500: + self.fail('IDM fault') + elif int(respG['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(409, int(respG['status'])) + if int(respG['status']) == 200: + self.tenant = content.get("id") + self.tenant_group = contentG.get("id") + + def test_tenant_group_create_forbidden_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + respG, contentG = create_tenant_group_xml('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + if int(respG['status']) == 200: + self.tenant_group = respG['group']['id'] + url = '%stenant/%s/groups' % (URL, self.tenant) + body = {"group": {"id": self.tenant_group, + "description": "A description ..."}} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json", + "X-Auth-Token": self.token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + def test_tenant_group_create_forbidden_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="%s"><description>A description...</description> \ + </group>' % self.tenant_group + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml", \ + "X-Auth-Token": self.token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + def test_tenant_group_create_expired_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + url = '%stenant/%s/groups' % (URL, self.tenant) + body = {"group": {"id": self.tenant_group, + "description": "A description ..."}} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json", + "X-Auth-Token": self.exp_auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + def test_tenant_group_create_expired_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get('id') + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="%s"> \ + <description>A description...</description> \ + </group>' % self.tenant + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml", \ + "X-Auth-Token": self.exp_auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + def test_tenant_group_create_missing_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + url = '%stenant/%s/groups' % (URL, self.tenant) + body = {"group": {"id": self.tenant_group, + "description": "A description ..."}} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + + def test_tenant_group_create_missing_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get('id') + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="%s"> \ + <description>A description...</description> \ + </group>' % self.tenant_group + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml", + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + + def test_tenant_group_create_disabled_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '{"group": { "id": "%s", \ + "description": "A description ..." } }' % self.tenant_group + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.disabled_token}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + def test_tenant_group_create_disabled_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get('id') + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="%s"> \ + <description>A description...</description> \ + </group>' % self.tenant_group + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml", + "X-Auth-Token": self.disabled_token, + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + def test_tenant_group_create_invalid_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '{"group": { "id": "%s", \ + "description": "A description ..." } }' % self.tenant + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": 'nonexsitingtoken'}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + + def test_tenant_group_create_invalid_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get('id') + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="%s"> \ + <description>A description...</description> \ + </group>' % self.tenant_group + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": 'nonexsitingtoken', + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) class get_tenant_groups_test(tenant_group_test): - def test_get_tenant_groups(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - - respG, contentG = create_tenant_group(self.tenant_group,\ - self.tenant, str(self.auth_token)) - - url = '%stenant/%s/groups' % (URL,self.tenant) - - resp, content = h.request(url, "GET", body='{}',\ - headers={"Content-Type": "application/json",\ - "X-Auth-Token": self.auth_token}) - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(200, int(resp['status'])) - - def test_get_tenant_groups_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - - respG, contentG = create_tenant_group_xml(self.tenant_group,\ - self.tenant, str(self.auth_token)) - - url = '%stenant/%s/groups' % (URL,self.tenant) - - resp, content = h.request(url, "GET", body='',\ - headers={"Content-Type": "application/xml",\ - "X-Auth-Token": self.auth_token, - "ACCEPT": "application/xml"}) - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(200, int(resp['status'])) - - def test_get_tenant_groups_forbidden_token(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - - respG, contentG = create_tenant_group(self.tenant_group,\ - self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups' % (URL,self.tenant) - #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='{}',\ - headers={"Content-Type": "application/json",\ - "X-Auth-Token": self.token}) - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(403, int(resp['status'])) - - def test_get_tenant_groups_forbidden_token_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group,\ - self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups' % (URL,self.tenant) - #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='',\ - headers={"Content-Type": "application/xml",\ - "X-Auth-Token": self.token, - "ACCEPT": "application/xml"}) - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(403, int(resp['status'])) - - def test_get_tenant_groups_exp_token(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group,\ - self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups' % (URL,self.tenant) - #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='{}',\ - headers={"Content-Type": "application/json",\ - "X-Auth-Token": self.exp_auth_token}) - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(401, int(resp['status'])) - - def test_get_tenant_groups_exp_token_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group,\ - self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups' % (URL,self.tenant) - #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='',\ - headers={"Content-Type": "application/xml",\ - "X-Auth-Token": self.exp_auth_token, - "ACCEPT": "application/xml"}) - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(401, int(resp['status'])) + def test_get_tenant_groups(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + + url = '%stenant/%s/groups' % (URL,self.tenant) + + resp, content = h.request(url, "GET", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + + def test_get_tenant_groups_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + + respG, contentG = create_tenant_group_xml(self.tenant_group,\ + self.tenant, str(self.auth_token)) + + url = '%stenant/%s/groups' % (URL,self.tenant) + + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + + def test_get_tenant_groups_forbidden_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups' % (URL,self.tenant) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + def test_get_tenant_groups_forbidden_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups' % (URL,self.tenant) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + def test_get_tenant_groups_exp_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups' % (URL,self.tenant) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.exp_auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + def test_get_tenant_groups_exp_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups' % (URL,self.tenant) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.exp_auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) class get_tenant_group_test(tenant_group_test): - def test_get_tenant_group(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group,\ - self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group) - #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='{}',\ - headers={"Content-Type": "application/json",\ - "X-Auth-Token": self.auth_token}) - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(200, int(resp['status'])) - - def test_get_tenant_group_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group_xml(self.tenant_group,\ - self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group) - #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='',\ - headers={"Content-Type": "application/xml",\ - "X-Auth-Token": self.auth_token, - "ACCEPT": "application/xml"}) - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(200, int(resp['status'])) - - def test_get_tenant_group_bad(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group,\ - self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups/%s' % (URL,'tenant_bad',self.tenant_group) - - #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='{',\ - headers={"Content-Type": "application/json",\ - "X-Auth-Token": self.auth_token}) - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(404, int(resp['status'])) - - def test_get_tenant_group_bad_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group,\ - self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups/%s' % (URL,'tenant_bad',self.tenant_group) - #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='{',\ - headers={"Content-Type": "application/xml",\ - "X-Auth-Token": self.auth_token, - "ACCEPT": "application/xml"}) - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(404, int(resp['status'])) - - def test_get_tenant_group_not_found(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group,\ - self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups/%s' % (URL,self.tenant,'nonexistinggroup') - #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='{}',\ - headers={"Content-Type": "application/json",\ - "X-Auth-Token": self.auth_token}) - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(404, int(resp['status'])) - - def test_get_tenant_group_not_found_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group,\ - self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups/%s' % (URL,self.tenant,'nonexistinggroup') - - #test for Content-Type = application/json - resp, content = h.request(url, "GET", body='',\ - headers={"Content-Type": "application/xml",\ - "X-Auth-Token": self.auth_token, - "ACCEPT": "application/xml"}) - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(404, int(resp['status'])) + def test_get_tenant_group(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + + def test_get_tenant_group_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group) + #test for Content-Type = application/xml + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + + def test_get_tenant_group_bad(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/%s' % (URL,'tenant_bad',self.tenant_group) + + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + + def test_get_tenant_group_bad_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/%s' % (URL,'tenant_bad',self.tenant_group) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + + def test_get_tenant_group_not_found(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/%s' % (URL,self.tenant,'nonexistinggroup') + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + + def test_get_tenant_group_not_found_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/%s' % (URL,self.tenant,'nonexistinggroup') + + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) class update_tenant_group_test(tenant_group_test): - def test_update_tenant_group(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group,\ - self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group) - - data = '{"group": { "id":"%s","description": "A NEW description..." ,\ - "tenantId":"%s" }}' % (self.tenant_group,self.tenant) - #test for Content-Type = application/json - resp, content = h.request(url, "PUT", body=data,\ - headers={"Content-Type": "application/json",\ - "X-Auth-Token": self.auth_token}) - body = json.loads(content) - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(200, int(resp['status'])) - self.assertEqual(self.tenant_group, body['group']['id']) - self.assertEqual('A NEW description...', \ - body['group']['description']) - - def test_update_tenant_group_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant_xml(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group,\ - self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups/%s' % (URL, self.tenant ,self.tenant_group) - data = '<?xml version="1.0" encoding="UTF-8"?> \ - <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ - tenantId="%s" id="%s"> \ - <description>A NEW description...</description> \ - </group>' % (self.tenant, self.tenant_group) - - #test for Content-Type = application/json - resp, content = h.request(url, "PUT", body=data,\ - headers={"Content-Type": "application/xml",\ - "X-Auth-Token": self.auth_token, - "ACCEPT": "application/xml"}) - - body = etree.fromstring(content) - desc = body.find("{http://docs.openstack.org/idm/api/v1.0}description") - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(200, int(resp['status'])) - self.assertEqual(str(self.tenant_group), str(body.get('id'))) - self.assertEqual('A NEW description...', \ - desc.text) - - def test_update_tenant_group_bad(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group,\ - self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group) - data = '{"group": { "description_bad": "A NEW description...",\ - "id":"%s","tenantId":"%s" }}' % (self.tenant_group,self.tenant) - #test for Content-Type = application/json - - resp, content = h.request(url, "PUT", body=data,\ - headers={"Content-Type": "application/json",\ - "X-Auth-Token": self.auth_token}) - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(400, int(resp['status'])) - - def test_update_tenant_group_bad_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group,\ - self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group) - data = '<?xml version="1.0" encoding="UTF-8"?> \ - <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ - tenantId="%s" id="%s"> \ - <description_bad>A NEW description...</description> \ - </group>' % (self.tenant, self.tenant_group) - #test for Content-Type = application/json - resp, content = h.request(url, "PUT", body=data,\ - headers={"Content-Type": "application/xml",\ - "X-Auth-Token": self.auth_token, - "ACCEPT": "application/xml"}) - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(400, int(resp['status'])) - - def test_update_tenant_group_not_found(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - respG, contentG = create_tenant_group(self.tenant_group,\ - self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups/NonexistingID' % (URL, self.tenant) - - data = '{"group": { "description": "A NEW description...",\ - "id":"NonexistingID", "tenantId"="test_tenant" }}' - #test for Content-Type = application/json - resp, content = h.request(url, "GET", body=data,\ - headers={"Content-Type": "application/json",\ - "X-Auth-Token": self.auth_token}) - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(404, int(resp['status'])) - - def test_update_tenant_group_not_found_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant(self.tenant, str(self.auth_token)) - url = '%stenant/%s/groups/NonexistingID' % (URL, self.tenant) - data = '<?xml version="1.0" encoding="UTF-8"?> \ - <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ - id="NonexistingID", "tenant_id"="test_tenant"> \ - <description_bad>A NEW description...</description> \ - </group>' - #test for Content-Type = application/json - resp, content = h.request(url, "GET", body=data,\ - headers={"Content-Type": "application/xml",\ - "X-Auth-Token": self.auth_token, - "ACCEPT": "application/xml"}) - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(404, int(resp['status'])) + def test_update_tenant_group(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + resp, content = delete_tenant_group(self.tenant_group, \ + self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group) + + data = '{"group": { "id":"%s","description": "A NEW description..." ,\ + "tenantId":"%s" }}' % (self.tenant_group,self.tenant) + #test for Content-Type = application/json + + resp, content = h.request(url, "PUT", body=data,\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + + + body = json.loads(content) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + self.assertEqual(self.tenant_group, body['group']['id']) + self.assertEqual('A NEW description...', \ + body['group']['description']) + + def test_update_tenant_group_xml(self): + h = httplib2.Http(".cache") + resp, content = delete_tenant(self.tenant, str(self.auth_token)) + + resp, content = create_tenant(self.tenant, str(self.auth_token)) + + resp, content = delete_tenant_group(self.tenant_group, \ + self.tenant, str(self.auth_token)) + + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + + url = '%stenant/%s/groups/%s' % (URL, self.tenant ,self.tenant_group) + + data = '<group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + tenantId="%s" id="%s"> \ + <description>A NEW description...</description> \ + </group>' % (self.tenant, self.tenant_group) + + #test for Content-Type = application/json + resp, content = h.request(url, "PUT", body=data, + headers={"Content-Type": "application/xml", + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + + + body = etree.fromstring(content) + desc = body.find("{http://docs.openstack.org/idm/api/v1.0}description") + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + + self.assertEqual(200, int(resp['status'])) + self.assertEqual(str(self.tenant_group), str(body.get('id'))) + self.assertEqual('A NEW description...', desc.text) + + def test_update_tenant_group_bad(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + resp, content = delete_tenant_group(self.tenant_group, \ + self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group) + data = '{"group": { "description_bad": "A NEW description...",\ + "id":"%s","tenantId":"%s" }}' % (self.tenant_group,self.tenant) + #test for Content-Type = application/json + + resp, content = h.request(url, "PUT", body=data,\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(400, int(resp['status'])) + + def test_update_tenant_group_bad_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + resp, content = delete_tenant_group(self.tenant_group, \ + self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group) + data = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + tenantId="%s" id="%s"> \ + <description_bad>A NEW description...</description> \ + </group>' % (self.tenant, self.tenant_group) + #test for Content-Type = application/json + resp, content = h.request(url, "PUT", body=data,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(400, int(resp['status'])) + + def test_update_tenant_group_not_found(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + resp, content = delete_tenant_group(self.tenant_group, \ + self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/NonexistingID' % (URL, self.tenant) + + data = '{"group": { "description": "A NEW description...",\ + "id":"NonexistingID", "tenantId"="test_tenant" }}' + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body=data,\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + + def test_update_tenant_group_not_found_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/NonexistingID' % (URL, self.tenant) + data = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="NonexistingID", "tenant_id"="test_tenant"> \ + <description_bad>A NEW description...</description> \ + </group>' + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body=data,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) class delete_tenant_group_test(tenant_test): - def test_delete_tenant_group_not_found(self): - #resp,content=create_tenant("test_tenant_delete", str(self.auth_token)) - resp, content = delete_tenant_group("test_tenant_delete111", \ - "test_tenant", str(self.auth_token)) - self.assertEqual(404, int(resp['status'])) - - def test_delete_tenant_group_not_found_xml(self): - #resp,content=create_tenant("test_tenant_delete", str(self.auth_token)) - resp, content = delete_tenant_group_xml("test_tenant_delete111", \ - "test_tenant", str(self.auth_token)) - self.assertEqual(404, int(resp['status'])) - - def test_delete_tenant_group(self): - resp, content = create_tenant("test_tenant_delete", \ - str(self.auth_token)) - respG, contentG = create_tenant_group('test_tenant_group_delete', \ - "test_tenant_delete", str(self.auth_token)) - respG, contentG = delete_tenant_group('test_tenant_group_delete', \ - "test_tenant_delete", str(self.auth_token)) - resp, content = delete_tenant("test_tenant_delete", \ - str(self.auth_token)) - self.assertEqual(204, int(respG['status'])) - - def test_delete_tenant_group_xml(self): - resp, content = create_tenant_xml("test_tenant_delete", \ - str(self.auth_token)) - respG, contentG = create_tenant_group_xml('test_tenant_group_delete', \ - "test_tenant_delete", str(self.auth_token)) - respG, contentG = delete_tenant_group_xml('test_tenant_group_delete', \ - "test_tenant_delete", str(self.auth_token)) - resp, content = delete_tenant_xml("test_tenant_delete", \ - str(self.auth_token)) - self.assertEqual(204, int(respG['status'])) + def test_delete_tenant_group_not_found(self): + #resp,content=create_tenant("test_tenant_delete", str(self.auth_token)) + resp, content = delete_tenant_group("test_tenant_delete111", \ + "test_tenant", str(self.auth_token)) + self.assertEqual(404, int(resp['status'])) -class global_group_test(unittest.TestCase): + def test_delete_tenant_group_not_found_xml(self): + #resp,content=create_tenant("test_tenant_delete", str(self.auth_token)) + resp, content = delete_tenant_group_xml("test_tenant_delete111", \ + "test_tenant", str(self.auth_token)) + self.assertEqual(404, int(resp['status'])) - def setUp(self): - self.token = get_token('joeuser', 'secrete', 'token') - self.tenant = get_tenant() - self.user = get_user() - self.userdisabled = get_userdisabled() - self.auth_token = get_auth_token() - self.exp_auth_token = get_exp_auth_token() - self.disabled_token = get_disabled_token() - self.tenant_group = 'test_tenant_group' - - def tearDown(self): - resp, content = delete_tenant_group('test_tenant_group', \ - self.tenant, self.auth_token) - resp, content = delete_tenant(self.tenant, self.auth_token) + def test_delete_tenant_group(self): + resp, content = create_tenant("test_tenant_delete", \ + str(self.auth_token)) + respG, contentG = create_tenant_group('test_tenant_group_delete', \ + "test_tenant_delete", str(self.auth_token)) + respG, contentG = delete_tenant_group('test_tenant_group_delete', \ + "test_tenant_delete", str(self.auth_token)) + resp, content = delete_tenant("test_tenant_delete", \ + str(self.auth_token)) + self.assertEqual(204, int(respG['status'])) + def test_delete_tenant_group_xml(self): + resp, content = create_tenant("test_tenant_delete", \ + str(self.auth_token)) + respG, contentG = create_tenant_group('test_tenant_group_delete', \ + "test_tenant_delete", str(self.auth_token)) + respG, contentG = delete_tenant_group('test_tenant_group_delete', \ + "test_tenant_delete", str(self.auth_token)) + resp, content = delete_tenant_xml("test_tenant_delete", \ + str(self.auth_token)) + self.assertEqual(204, int(respG['status'])) +class global_group_test(unittest.TestCase): -class create_global_group_test(global_group_test): + def setUp(self): + self.token = get_token('joeuser', 'secrete', 'token') + self.tenant = get_tenant() + self.user = get_user() + self.userdisabled = get_userdisabled() + self.auth_token = get_auth_token() + self.exp_auth_token = get_exp_auth_token() + self.disabled_token = get_disabled_token() + self.tenant_group = 'test_tenant_group' - def test_global_group_create(self): + def tearDown(self): + resp, content = delete_tenant_group('test_tenant_group', \ + self.tenant, self.auth_token) + resp, content = delete_tenant(self.tenant, self.auth_token) - respG, contentG = delete_global_group('test_tenant_group', \ - str(self.auth_token)) - respG, contentG = create_global_group(str(self.auth_token)) - self.group = 'test_tenant_group' - if int(respG['status']) == 500: - self.fail('IDM fault') - elif int(respG['status']) == 503: - self.fail('Service Not Available') - if int(respG['status']) not in (200, 201): - self.fail('Failed due to %d' % int(respG['status'])) +class create_global_group_test(global_group_test): - def test_global_group_create_again(self): + def test_global_group_create(self): - respG, contentG = create_global_group('test_tenant_group', \ - str(self.auth_token)) - respG, contentG = create_global_group('test_tenant_group', \ - "test_tenant", str(self.auth_token)) + respG, contentG = delete_global_group('test_global_group', \ + str(self.auth_token)) + respG, contentG = create_global_group('test_global_group', \ + str(self.auth_token)) + self.group = 'test_global_group' + + if int(respG['status']) == 500: + self.fail('IDM fault') + elif int(respG['status']) == 503: + self.fail('Service Not Available') + if int(respG['status']) not in (200, 201): + self.fail('Failed due to %d' % int(respG['status'])) - if int(respG['status']) == 200: - self.tenant = content['tenant']['id'] - self.tenant_group = contentG['group']['id'] - if int(respG['status']) == 500: - self.fail('IDM fault') - elif int(respG['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(409, int(respG['status'])) - if int(respG['status']) == 200: - self.tenant = content['tenant']['id'] - self.tenant_group = contentG['group']['id'] -class create_tenant_group_test(tenant_group_test): + def test_global_group_create_again(self): + respG, contentG = delete_global_group('test_global_group', \ + str(self.auth_token)) + respG, contentG = create_global_group('test_global_group', \ + str(self.auth_token)) + respG, contentG = create_global_group('test_global_group', \ + str(self.auth_token)) + + if int(respG['status']) == 200: + self.tenant = content['tenant']['id'] + self.tenant_group = contentG['group']['id'] + if int(respG['status']) == 500: + self.fail('IDM fault') + elif int(respG['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(409, int(respG['status'])) + if int(respG['status']) == 200: + self.tenant = content['tenant']['id'] + self.tenant_group = contentG['group']['id'] - def test_tenant_group_create_forbidden_token(self): - h = httplib2.Http(".cache") - resp, content = create_tenant("test_tenant", str(self.auth_token)) - respG, contentG = create_tenant_group_xml('test_tenant_group', \ - "test_tenant", str(self.auth_token)) - if int(resp['status']) == 200: - self.tenant = content['tenant']['id'] - - if int(respG['status']) == 200: - self.tenant_group = respG['group']['id'] - - url = '%stenant/%s/groups' % (URL, self.tenant) - body = {"group": {"id": self.tenant_group, - "description": "A description ..." - }} - resp, content = h.request(url, "POST", body=json.dumps(body), - headers={"Content-Type": "application/json", - "X-Auth-Token": self.token}) - - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(403, int(resp['status'])) - - - def test_tenant_group_create_expired_token(self): - h = httplib2.Http(".cache") - resp, content = create_tenant("test_tenant", str(self.auth_token)) - if int(resp['status']) == 200: - self.tenant = content['tenant']['id'] - - url = '%stenant/%s/groups' % (URL, self.tenant) - body = {"group": {"id": self.tenant_group, - "description": "A description ..." - }} - resp, content = h.request(url, "POST", body=json.dumps(body), - headers={"Content-Type": "application/json", - "X-Auth-Token": self.exp_auth_token}) - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(401, int(resp['status'])) - - def test_tenant_group_create_missing_token(self): - h = httplib2.Http(".cache") - resp, content = create_tenant("test_tenant", str(self.auth_token)) - if int(resp['status']) == 200: - self.tenant = content['tenant']['id'] - - url = '%stenant/%s/groups' % (URL, self.tenant) - body = {"group": {"id": self.tenant_group, - "description": "A description ..."}} - resp, content = h.request(url, "POST", body=json.dumps(body), - headers={"Content-Type": "application/json"}) - - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(401, int(resp['status'])) - - def test_tenant_group_create_disabled_token(self): - h = httplib2.Http(".cache") - resp, content = create_tenant("test_tenant", str(self.auth_token)) - if int(resp['status']) == 200: - self.tenant = content['tenant']['id'] - - url = '%stenant/%s/groups' % (URL, self.tenant) - body = '{"group": { "id": "%s", \ - "description": "A description ..." } }' % self.tenant_group - resp, content = h.request(url, "POST", body=body,\ - headers={"Content-Type": "application/json",\ - "X-Auth-Token": self.disabled_token}) - - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(403, int(resp['status'])) - - def test_tenant_group_create_invalid_token(self): - h = httplib2.Http(".cache") - resp, content = create_tenant("test_tenant", str(self.auth_token)) - if int(resp['status']) == 200: - self.tenant = content['tenant']['id'] - - url = '%stenant/%s/groups' % (URL, self.tenant) - body = '{"group": { "id": "%s", \ - "description": "A description ..." } }' % self.tenant - resp, content = h.request(url, "POST", body=body,\ - headers={"Content-Type": "application/json",\ - "X-Auth-Token": 'nonexsitingtoken'}) - - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(401, int(resp['status'])) - - - - def test_tenant_group_create_xml(self): - resp, content = delete_tenant_xml('test_tenant', str(self.auth_token)) - resp, content = create_tenant_xml('test_tenant', str(self.auth_token)) - respG, contentG = delete_tenant_group_xml('test_tenant_group', \ - "test_tenant", str(self.auth_token)) - respG, contentG = create_tenant_group_xml('test_tenant_group', \ - "test_tenant", str(self.auth_token)) - - self.tenant = 'test_tenant' - self.tenant_group = 'test_tenant_group' - content = etree.fromstring(content) - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - - if int(respG['status']) not in (200, 201): - - self.fail('Failed due to %d' % int(respG['status'])) - - def test_tenant_group_create_again_xml(self): - - resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) - - respG, contentG = create_tenant_group_xml('test_tenant_group', \ - "test_tenant", str(self.auth_token)) - respG, contentG = create_tenant_group_xml('test_tenant_group', \ - "test_tenant", str(self.auth_token)) - - content = etree.fromstring(content) - contentG = etree.fromstring(contentG) - if int(respG['status']) == 200: - self.tenant = content.get("id") - self.tenant_group = contentG.get("id") - - if int(respG['status']) == 500: - self.fail('IDM fault') - elif int(respG['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(409, int(respG['status'])) - if int(respG['status']) == 200: - self.tenant = content.get("id") - self.tenant_group = contentG.get("id") - - def test_tenant_group_create_forbidden_token_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant("test_tenant", str(self.auth_token)) - if int(resp['status']) == 200: - self.tenant = content['tenant']['id'] - - url = '%stenant/%s/groups' % (URL, self.tenant) - body = '<?xml version="1.0" encoding="UTF-8"?> \ - <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ - id="%s"> \ - <description>A description...</description> \ - </group>' % self.tenant_group - resp, content = h.request(url, "POST", body=body,\ - headers={"Content-Type": "application/xml", \ - "X-Auth-Token": self.token, - "ACCEPT": "application/xml"}) - - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - - self.assertEqual(403, int(resp['status'])) - - def test_tenant_group_create_expired_token_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) - content = etree.fromstring(content) - if int(resp['status']) == 200: - self.tenant = content.get('id') - - url = '%stenant/%s/groups' % (URL, self.tenant) - body = '<?xml version="1.0" encoding="UTF-8"?> \ - <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ - id="%s"> \ - <description>A description...</description> \ - </group>' % self.tenant - - resp, content = h.request(url, "POST", body=body,\ - headers={"Content-Type": "application/xml", \ - "X-Auth-Token": self.exp_auth_token, - "ACCEPT": "application/xml"}) - - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(401, int(resp['status'])) - - def test_tenant_group_create_missing_token_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) - content = etree.fromstring(content) - if int(resp['status']) == 200: - self.tenant = content.get('id') - - url = '%stenant/%s/groups' % (URL, self.tenant) - - body = '<?xml version="1.0" encoding="UTF-8"?> \ - <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ - id="%s"> \ - <description>A description...</description> \ - </group>' % self.tenant_group - resp, content = h.request(url, "POST", body=body,\ - headers={"Content-Type": "application/xml", - "ACCEPT": "application/xml"}) - - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(401, int(resp['status'])) - - def test_tenant_group_create_disabled_token_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) - content = etree.fromstring(content) - if int(resp['status']) == 200: - self.tenant = content.get('id') - - url = '%stenant/%s/groups' % (URL, self.tenant) - body = '<?xml version="1.0" encoding="UTF-8"?> \ - <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ - id="%s"> \ - <description>A description...</description> \ - </group>' % self.tenant_group - resp, content = h.request(url, "POST", body=body,\ - headers={"Content-Type": "application/xml", - "X-Auth-Token": self.disabled_token, - "ACCEPT": "application/xml"}) - - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(403, int(resp['status'])) - - def test_tenant_group_create_invalid_token_xml(self): - h = httplib2.Http(".cache") - resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) - content = etree.fromstring(content) - if int(resp['status']) == 200: - self.tenant = content.get('id') - - url = '%stenant/%s/groups' % (URL, self.tenant) - body = '<?xml version="1.0" encoding="UTF-8"?> \ - <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ - id="%s"> \ - <description>A description...</description> \ - </group>' % self.tenant_group - resp, content = h.request(url, "POST", body=body,\ - headers={"Content-Type": "application/xml",\ - "X-Auth-Token": 'nonexsitingtoken', - "ACCEPT": "application/xml"}) - - if int(resp['status']) == 500: - self.fail('IDM fault') - elif int(resp['status']) == 503: - self.fail('Service Not Available') - self.assertEqual(401, int(resp['status'])) if __name__ == '__main__': unittest.main() |
