diff options
| author | Ramana Juvvadi <rjuvvadi@hcl.com> | 2011-05-07 13:08:13 -0500 |
|---|---|---|
| committer | Ramana Juvvadi <rjuvvadi@hcl.com> | 2011-05-07 13:08:13 -0500 |
| commit | 6eacad3fb064fd2ebfc99e1efafbc108a1b91090 (patch) | |
| tree | 52c81a254b5506f15433339465265fac3b3259f2 | |
| parent | 2e1b2f77383f3214f5cb287682e4e864b3ee44aa (diff) | |
| download | keystone-6eacad3fb064fd2ebfc99e1efafbc108a1b91090.tar.gz keystone-6eacad3fb064fd2ebfc99e1efafbc108a1b91090.tar.xz keystone-6eacad3fb064fd2ebfc99e1efafbc108a1b91090.zip | |
Added some more functions through Routes and mapper
| -rwxr-xr-x | keystone/auth_server.py | 250 | ||||
| -rw-r--r-- | keystone/db/sqlalchemy/api.py | 286 | ||||
| -rw-r--r-- | keystone/logic/service.py | 592 | ||||
| -rw-r--r-- | keystone/logic/types/fault.py | 43 | ||||
| -rw-r--r-- | keystone/logic/types/user.py | 268 | ||||
| -rw-r--r-- | test/unit/test_identity.py | 4185 |
6 files changed, 3417 insertions, 2207 deletions
diff --git a/keystone/auth_server.py b/keystone/auth_server.py index f0cf0a7c..ec8bf6df 100755 --- a/keystone/auth_server.py +++ b/keystone/auth_server.py @@ -58,15 +58,27 @@ if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'keystone', '__init__.py')): from queryext import exthandler from keystone.common import wsgi import keystone.logic.service as serv +import keystone.logic.types.tenant as tenants import keystone.logic.types.auth as auth +import keystone.logic.types.fault as fault +import keystone.logic.types.user as users service = serv.IDMService() + def is_xml_response(req): if not "Accept" in req.headers: return False return req.content_type == "application/xml" + +def get_auth_token(req): + auth_token = None + if "X-Auth-Token" in req.headers: + auth_token = req.headers["X-Auth-Token"] + return auth_token + + def get_normalized_request_content(model, req): """initialize a model from json/xml contents of request body""" @@ -78,6 +90,7 @@ def get_normalized_request_content(model, req): raise fault.IDMFault("I don't understand the content type ", code=415) return ret + def send_result(code, req, result): content = None resp = Response() @@ -91,11 +104,12 @@ def send_result(code, req, result): resp.headers['Content-Type'] = "application/json" resp.status = code if code > 399: - #return bottle.abort(code, content) - return; + #return bottle.abort(code, content) + return return content -class Controller(wsgi.Controller): + +class AuthController(wsgi.Controller): def __init__(self, options): self.options = options @@ -104,12 +118,183 @@ class Controller(wsgi.Controller): creds = get_normalized_request_content(auth.PasswordCredentials, req) return send_result(200, req, service.authenticate(creds)) - def validate_token(self, req): + def validate_token(self, req, token_id): belongs_to = None if "belongsTo" in req.GET: belongs_to = req.GET["belongsTo"] - rval = service.validate_token(get_auth_token(), token_id, belongs_to) - return send_result(200, rval) + rval = service.validate_token(get_auth_token(req), token_id, belongs_to) + return send_result(200, req, rval) + + def delete_token(self, req, token_id): + return send_result(204, req, service.revoke_token(get_auth_token(req), token_id)) + + +class TenantController(wsgi.Controller): + + def __init__(self, options): + self.options = options + + def create_tenant(self, req): + tenant = get_normalized_request_content(tenants.Tenant, req) + return send_result(201, req, + service.create_tenant(get_auth_token(req), tenant)) + + def get_tenants(self, req): + marker = None + if "marker" in req.GET: + marker = req.GET["marker"] + + if "limit" in req.GET: + limit = req.GET["limit"] + else: + limit = 10 + + url = '%s://%s:%s%s' % (req.environ['wsgi.url_scheme'], + req.environ.get("SERVER_NAME"), + req.environ.get("SERVER_PORT"), + req.environ['PATH_INFO']) + + tenants = service.get_tenants(get_auth_token(req), marker, limit, url) + return send_result(200, req, tenants) + + def get_tenant(self, req, tenant_id): + tenant = service.get_tenant(get_auth_token(req), tenant_id) + return send_result(200, req, tenant) + + def update_tenant(self, req, tenant_id): + tenant = get_normalized_request_content(tenants.Tenant, req) + rval = service.update_tenant(get_auth_token(req), tenant_id, tenant) + return send_result(200, req, rval) + + def delete_tenant(self, req, tenant_id): + rval = service.delete_tenant(get_auth_token(req), tenant_id) + return send_result(204, req, rval) + + + + # Tenant Group Methods + + def create_tenant_group(self, req, tenant_id): + group = get_normalized_request_content(tenants.Group, req) + return send_result(201, req, + service.create_tenant_group(get_auth_token(req), \ + tenant_id, group)) + + def get_tenant_groups(self, req, tenant_id): + marker = None + if "marker" in req.GET: + marker = req.GET["marker"] + + if "limit" in req.GET: + limit = req.GET["limit"] + else: + limit = 10 + + url = '%s://%s:%s%s' % (req.environ['wsgi.url_scheme'], + req.environ.get("SERVER_NAME"), + req.environ.get("SERVER_PORT"), + req.environ['PATH_INFO']) + + groups = service.get_tenant_groups(get_auth_token(req), + tenant_id, marker, limit, url) + return send_result(200, req, groups) + + def get_tenant_group(self, req, tenant_id, group_id): + tenant = service.get_tenant_group(get_auth_token(req), tenant_id, + group_id) + return send_result(200, req, tenant) + + + def update_tenant_group(self, req, tenant_id, group_id): + group = get_normalized_request_content(tenants.Group, req) + rval = service.update_tenant_group(get_auth_token(req),\ + tenant_id, group_id, group) + return send_result(200, req, rval) + + def delete_tenant_group(self, req, tenant_id, group_id): + rval = service.delete_tenant_group(get_auth_token(req), tenant_id, + group_id) + return send_result(204, req, rval) + + def add_user_tenant_group(self, req, tenant_id, group_id, user_id): + # TBD + # IDMDevguide clarification needed on this property + return None + + def delete_user_tenant_group(self, req, tenant_id, group_id, user_id): + # TBD + # IDMDevguide clarification needed on this property + return None + def get_user_tenant_group(self, req, tenant_id, group_id, user_id): + # TBD + # IDMDevguide clarification needed on this property + return None + +class UserController(wsgi.Controller): + + def __init__(self, options): + self.options = options + + def create_user(self, req, tenant_id): + user = get_normalized_request_content(users.User, req) + return send_result(201, req, + service.create_user(get_auth_token(req), tenant_id, user)) + + def get_tenant_users(self, req, tenant_id): + marker = None + if "marker" in req.GET: + marker = req.GET["marker"] + if "limit" in req.GET: + limit = req.GET["limit"] + else: + limit = 10 + url = '%s://%s:%s%s' % (req.environ['wsgi.url_scheme'], + req.environ.get("SERVER_NAME"), + req.environ.get("SERVER_PORT"), + req.environ['PATH_INFO']) + users = service.get_tenant_users(get_auth_token(req), tenant_id, marker, limit, url) + return send_result(200, req, users) + + def get_user_groups(self, req, tenant_id, user_id): + marker = None + if "marker" in req.GET: + marker = req.GET["marker"] + + if "limit" in req.GET: + limit = req.GET["limit"] + else: + limit = 10 + + url = '%s://%s:%s%s' % (req.environ['wsgi.url_scheme'],\ + req.environ.get("SERVER_NAME"),\ + req.environ.get("SERVER_PORT"),\ + req.environ['PATH_INFO']) + + groups = service.get_user_groups(get_auth_token(),\ + tenant_id,user_id, marker, limit,url) + return send_result(200, groups) + + def get_user(self, req, tenant_id, user_id): + user = service.get_user(get_auth_token(req), tenant_id, user_id) + return send_result(200, req, user) + + def update_user(self, req, user_id, tenant_id): + user = get_normalized_request_content(users.User_Update, req) + rval = service.update_user(get_auth_token(req), user_id, user, tenant_id) + return send_result(200, req, rval) + + def delete_user(self, req, user_id, tenant_id): + rval = service.delete_user(get_auth_token(req), user_id, tenant_id) + return send_result(204, req, rval) + + def set_user_password(self, req, user_id, tenant_id): + user = get_normalized_request_content(users.User_Update, req) + rval = service.set_user_password(get_auth_token(req), user_id, user, tenant_id) + return send_result(204, req, rval) + + def set_user_enabled(self, req, user_id,tenant_id): + rval = service.enable_disable_user(get_auth_token(req), user_id, tenant_id) + return send_result(204, req, rval) class Auth_API(wsgi.Router): @@ -118,10 +303,55 @@ class Auth_API(wsgi.Router): def __init__(self, options): self.options = options mapper = routes.Mapper() - controller = Controller(options) - mapper.connect("/v1.0/token", controller=controller, action="authenticate") - mapper.connect("/v1.0/token/{id}", controller=controller, - action="validate_token") + + # Token Operations + auth_controller = AuthController(options) + mapper.connect("/v1.0/token", controller=auth_controller, action="authenticate") + mapper.connect("/v1.0/token/{token_id}", controller=auth_controller, + action="validate_token", conditions=dict(method=["GET"])) + mapper.connect("/v1.0/token/{token_id}", controller=auth_controller, + action="delete_token", conditions=dict(method=["DELETE"])) + + # Tenant Operations + tenant_controller = TenantController(options) + mapper.connect("/v1.0/tenants", controller=tenant_controller, + action="create_tenant", conditions=dict(method=["POST"])) + mapper.connect("/v1.0/tenants", controller=tenant_controller, + action="get_tenants", conditions=dict(method=["GET"])) + mapper.connect("/v1.0/tenants/{tenant_id}", controller=tenant_controller, + action="get_tenant", conditions=dict(method=["GET"])) + mapper.connect("/v1.0/tenants/{tenant_id}", controller=tenant_controller, + action="update_tenant", conditions=dict(method=["PUT"])) + mapper.connect("/v1.0/tenants/{tenant_id}", controller=tenant_controller, + action="delete_tenant", conditions=dict(method=["DELETE"])) + + # Tenant Group Operations + + mapper.connect("/v1.0/tenants/{tenant_id}/groups", controller=tenant_controller, + action="create_tenant_group", conditions=dict(method=["POST"])) + mapper.connect("/v1.0/tenants/{tenant_id}/groups", controller=tenant_controller, + action="get_tenant_groups", conditions=dict(method=["GET"])) + mapper.connect("/v1.0/tenants/{tenant_id}/groups/{group_id}", controller=tenant_controller, + action="get_tenant_group", conditions=dict(method=["GET"])) + mapper.connect("/v1.0/tenants/{tenant_id}/groups/{group_id}", controller=tenant_controller, + action="update_tenant_group", conditions=dict(method=["PUT"])) + mapper.connect("/v1.0/tenants/{tenant_id}/groups/{group_id}", controller=tenant_controller, + action="delete_tenant_group", conditions=dict(method=["DELETE"])) + + # User Operations + user_controller = UserController(options) + mapper.connect("/v1.0/tenants/{tenant_id}/users", controller=user_controller, + action="create_user", conditions=dict(method=["POST"])) + mapper.connect("/v1.0/tenants/{tenant_id}/users", controller=user_controller, + action="get_tenant_users", conditions=dict(method=["GET"])) + mapper.connect("/v1.0/tenants/{tenant_id}/users/{user_id}", controller=user_controller, + action="get_user", conditions=dict(method=["GET"])) + mapper.connect("/v1.0/tenants/{tenant_id}/users/{user_id}", controller=user_controller, + action="update_user", conditions=dict(method=["PUT"])) + mapper.connect("/v1.0/tenants/{tenant_id}/users/{user_id}", controller=user_controller, + action="delete_user", conditions=dict(method=["DELETE"])) + + super(Auth_API, self).__init__(mapper) diff --git a/keystone/db/sqlalchemy/api.py b/keystone/db/sqlalchemy/api.py index 47768c17..751721f5 100644 --- a/keystone/db/sqlalchemy/api.py +++ b/keystone/db/sqlalchemy/api.py @@ -17,7 +17,7 @@ from session import get_session -from sqlalchemy.orm import joinedload +from sqlalchemy.orm import joinedload,aliased import models @@ -44,7 +44,7 @@ def tenant_get_all(session=None): def tenant_get_page(marker,limit,session=None): if not session: session = get_session() - + if marker: return session.query(models.Tenant).filter("id>:marker").params(\ marker = '%s' % marker).order_by\ @@ -53,8 +53,8 @@ def tenant_get_page(marker,limit,session=None): return session.query(models.Tenant).order_by(\ models.Tenant.id.desc()).limit(limit).all() #return session.query(models.Tenant).all() - - + + def tenant_get_page_markers(marker,limit,session=None): if not session: session = get_session() @@ -120,7 +120,7 @@ def tenant_group_is_empty( id, session=None): group_id=id).first() if a_user != None: return False - + return True def tenant_delete(id, session=None): @@ -142,13 +142,13 @@ def tenant_group_get(id, tenant, session=None): if not session: session = get_session() result = session.query(models.Group).filter_by(id=id, tenant_id=tenant).first() - + return result def tenant_group_get_page(tenantId, marker,limit,session=None): if not session: session = get_session() - + if marker: return session.query(models.Group).filter("id>:marker").params(\ marker = '%s' % marker).filter_by(\ @@ -158,8 +158,8 @@ def tenant_group_get_page(tenantId, marker,limit,session=None): return session.query(models.Group).filter_by(tenant_id=tenantId)\ .order_by(models.Group.id.desc()).limit(limit).all() #return session.query(models.Tenant).all() - - + + def tenant_group_get_page_markers(tenantId, marker,limit,session=None): if not session: session = get_session() @@ -213,6 +213,13 @@ def tenant_group_delete(id,tenant_id, session=None): tenantgroup_ref = tenant_group_get(id,tenant_id, session) session.delete(tenantgroup_ref) +def user_get_by_group(user_id, group_id, session=None): + if not session: + session = get_session() + result = session.query(models.UserGroupAssociation).filter_by( + group_id=group_id, user_id=user_id).first() + return result + def user_create(values): user_ref = models.User() @@ -229,14 +236,6 @@ def user_get(id, session=None): return result -def user_get_by_tenant(tenant_id, session=None): - if not session: - session = get_session() - result = session.query(models.UserTenantAssociation).filter_by( - tenant_id=tenant_id) - return result - - def user_groups(id, session=None): if not session: session = get_session() @@ -254,14 +253,6 @@ def user_update(id, values, session=None): user_ref.save(session=session) -def user_delete(id, session=None): - if not session: - session = get_session() - with session.begin(): - user_ref = user_get(id, session) - session.delete(user_ref) - - def group_get(id, session=None): if not session: session = get_session() @@ -272,43 +263,43 @@ def group_get(id, session=None): def group_users(id, session=None): if not session: session = get_session() - result = session.query(models.Users).filter_by( + result = session.query(models.User).filter_by( group_id=id) return result def users_tenant_group_get_page(group_id, marker,limit,session=None): if not session: session = get_session() - + if marker: - return session.query(models.Users).filter_by(\ + return session.query(models.User).filter_by(\ group_id=group_id).filter("id>:marker").params(\ marker = '%s' % marker).order_by\ - (models.Users.id.desc()).limit(limit).all() + (models.User.id.desc()).limit(limit).all() else: - return session.query(models.Users).filter_by(\ + return session.query(models.User).filter_by(\ group_id=group_id).order_by(\ - models.Users.id.desc()).limit(limit).all() - - - + models.User.id.desc()).limit(limit).all() + + + def users_tenant_group_get_page_markers(group_id, marker,limit,session=None): if not session: session = get_session() - first = session.query(models.Users).order_by(\ - models.Users.id).first() - last = session.query(models.Users).order_by(\ - models.Users.id.desc()).first() + first = session.query(models.User).order_by(\ + models.User.id).first() + last = session.query(models.User).order_by(\ + models.User.id.desc()).first() if marker is None: marker=first.id - next=session.query(models.Users).filter_by(\ + next=session.query(models.User).filter_by(\ group_id=group_id).filter("id > :marker").params(\ marker = '%s' % marker).order_by(\ - models.Users.id).limit(limit).all() - prev=session.query(models.Users).filter_by(\ + models.User.id).limit(limit).all() + prev=session.query(models.User).filter_by(\ group_id=group_id).filter("id < :marker").params(\ marker = '%s' % marker).order_by(\ - models.Users.id.desc()).limit(int(limit)).all() + models.User.id.desc()).limit(int(limit)).all() if len(next) == 0: next=last else: @@ -339,7 +330,7 @@ def group_get_all(session=None): def group_get_page(marker,limit,session=None): if not session: session = get_session() - + if marker: return session.query(models.Group).filter("id>:marker").params(\ marker = '%s' % marker).order_by\ @@ -347,9 +338,9 @@ def group_get_page(marker,limit,session=None): else: return session.query(models.Group).order_by(\ models.Group.id.desc()).limit(limit).all() - - - + + + def group_get_page_markers(marker,limit,session=None): if not session: session = get_session() @@ -422,3 +413,206 @@ def token_for_user(user_id, session=None): result = session.query(models.Token).filter_by( user_id=user_id).order_by("expires desc").first() return result + +def user_tenant_create(values): + user_tenant_ref = models.UserTenantAssociation() + user_tenant_ref.update(values) + user_tenant_ref.save() + return user_tenant_ref + +def user_get_update(id, session=None): + if not session: + session = get_session() + result = session.query(models.User).filter_by(id=id).first() + return result + +def user_get_email(email, session=None): + if not session: + session = get_session() + result = session.query(models.User).filter_by(email=email).first() + return result + +def users_get_by_tenant_get_page(tenant_id, marker, limit, session=None): + if not session: + session = get_session() + uta = aliased(models.UserTenantAssociation) + user = aliased(models.User) + if marker: + return session.query(user, uta).join( + (uta, uta.user_id == user.id)).\ + filter(uta.tenant_id == tenant_id).\ + filter("id>=:marker").params( + marker='%s' % marker).order_by( + user.id).limit(limit).all() + else: + return session.query(user, uta).\ + join((uta, uta.user_id == user.id)).\ + filter(uta.tenant_id == tenant_id).order_by( + user.id).limit(limit).all() + +def users_get_by_tenant_get_page_markers(tenant_id, marker, limit, session=None): + if not session: + session = get_session() + uta = aliased(models.UserTenantAssociation) + user = aliased(models.User) + first, firstassoc = session.query(user, uta).\ + join((uta, uta.user_id == user.id)).\ + filter(uta.tenant_id == tenant_id).\ + order_by(user.id).first() + last, lastassoc = session.query(user, uta).\ + join((uta, uta.user_id == user.id)).\ + filter(uta.tenant_id == tenant_id).\ + order_by(user.id.desc()).first() + if marker is None: + marker = first.id + next = session.query(user, uta).join((uta, uta.user_id == user.id)).\ + filter(uta.tenant_id == tenant_id).\ + filter("id >= :marker").params( + marker='%s' % marker).order_by( + user.id).limit(int(limit) + 1).all() + prev = session.query(user, uta).join((uta, uta.user_id == user.id)).\ + filter(uta.tenant_id == tenant_id).\ + filter("id < :marker").params( + marker='%s' % marker).order_by( + user.id.desc()).limit(int(limit)).all() + next_len = len(next) + prev_len = len(prev) + print next_len, prev_len + if next_len == 0: + next = last + else: + for t, a in next: + next = t + if prev_len == 0: + prev = first + else: + for t, a in prev: + prev = t + if first.id == marker: + prev = None + else: + prev = prev.id + if marker == last.id: + next = None + else: + next = next.id + return (prev, next) + +def user_groups_get_all(user_id, session=None): + if not session: + session = get_session() + uga = aliased(models.UserGroupAssociation) + group = aliased(models.Group) + return session.query(group, uga).\ + join((uga, uga.group_id == group.id)).\ + filter(uga.user_id == user_id).order_by( + group.id).all() + +def groups_get_by_user_get_page(user_id, marker, limit, session=None): + if not session: + session = get_session() + uga = aliased(models.UserGroupAssociation) + group = aliased(models.Group) + if marker: + return session.query(group, uga).join( + (uga, uga.group_id == group.id)).\ + filter(uga.user_id == user_id).\ + filter("id>=:marker").params( + marker='%s' % marker).order_by( + group.id).limit(limit).all() + else: + return session.query(group, uga).\ + join((uga, uga.group_id == group.id)).\ + filter(uga.user_id == user_id).order_by( + group.id).limit(limit).all() +def groups_get_by_user_get_page_markers(user_id, marker, limit, session=None): + if not session: + session = get_session() + uga = aliased(models.UserGroupAssociation) + group = aliased(models.Group) + first, firstassoc = session.query(group, uga).\ + join((uga, uga.group_id == group.id)).\ + filter(uga.user_id == user_id).\ + order_by(group.id).first() + last, lastassoc = session.query(group, uga).\ + join((uga, uga.group_id == group.id)).\ + filter(uga.user_id == user_id).\ + order_by(group.id.desc()).first() + if marker is None: + marker = first.id + next = session.query(group, uga).join( + (uga, uga.group_id == group.id)).\ + filter(uga.user_id == user_id).\ + filter("id>=:marker").params( + marker='%s' % marker).order_by( + group.id).limit(int(limit)).all() + + + prev = session.query(group, uga).join( + (uga, uga.group_id == group.id)).\ + filter(uga.user_id == user_id).\ + filter("id < :marker").params( + marker='%s' % marker).order_by( + group.id).limit(int(limit) + 1).all() + next_len = len(next) + prev_len = len(prev) + print next_len, prev_len + if next_len == 0: + next = last + else: + for t, a in next: + next = t + if prev_len == 0: + prev = first + else: + for t, a in prev: + prev = t + if first.id == marker: + prev = None + else: + prev = prev.id + if marker == last.id: + next = None + else: + next = next.id + return (prev, next) + + +def user_delete(id, session=None): + if not session: + session = get_session() + with session.begin(): + user_ref = user_get(id, session) + session.delete(user_ref) + +def user_get_by_tenant(id, tenant_id, session=None): + if not session: + session = get_session() + user_tenant = session.query(models.UserTenantAssociation).filter_by( + tenant_id=tenant_id, user_id=id).first() + + return user_tenant + +def user_delete_tenant(id, tenantId, session=None): + if not session: + session = get_session() + with session.begin(): + user_ref = user_get_by_tenant(id, tenantId, session) + session.delete(user_ref) + user_ref = user_get(id, session) + session.delete(user_ref) + +def user_tenant_group(values): + user_ref = models.UserGroupAssociation() + user_ref.update(values) + user_ref.save() + return user_ref + + +def user_tenant_group_delete(id, group_id, session=None): + if not session: + session = get_session() + with session.begin(): + usertenantgroup_ref = user_get_by_group(id, group_id, session) + session.delete(usertenantgroup_ref) + diff --git a/keystone/logic/service.py b/keystone/logic/service.py index ef76867b..cc852af0 100644 --- a/keystone/logic/service.py +++ b/keystone/logic/service.py @@ -20,6 +20,7 @@ import keystone.logic.types.auth as auth import keystone.logic.types.tenant as tenants import keystone.logic.types.atom as atom import keystone.logic.types.fault as fault +import keystone.logic.types.user as users import keystone.db.sqlalchemy.api as db_api import keystone.db.sqlalchemy.models as db_models @@ -124,12 +125,12 @@ class IDMService(object): # dtenant.desc, dtenant.enabled)) # return tenants.Tenants(ts, []) - - + + ## ## GET Tenants with Pagination ## - + def get_tenants(self, admin_token, marker, limit, url): self.__validate_token(admin_token) @@ -142,13 +143,13 @@ class IDMService(object): links=[] if prev: links.append(atom.Link('prev',"%s?'marker=%s&limit=%s'" % (url,prev,limit))) - if next: + if next: links.append(atom.Link('next',"%s?'marker=%s&limit=%s'" % (url,next,limit))) - - + + return tenants.Tenants(ts, links) - + def get_tenant(self, admin_token, tenant_id): self.__validate_token(admin_token) @@ -188,11 +189,11 @@ class IDMService(object): db_api.tenant_delete(dtenant.id) return None - + # # Tenant Group Operations # - + def create_tenant_group(self, admin_token, tenant, group): self.__validate_token(admin_token) @@ -201,7 +202,7 @@ class IDMService(object): if tenant == None: raise fault.BadRequestFault("Expecting a Tenant Id") - + dtenant = db_api.tenant_get(tenant) if dtenant == None: raise fault.ItemNotFoundFault("The tenant not found") @@ -209,7 +210,7 @@ class IDMService(object): if group.group_id == None: raise fault.BadRequestFault("Expecting a Group Id") - + if db_api.group_get(group.group_id) != None: raise fault.TenantGroupConflictFault( "A tenant group with that id already exists") @@ -223,20 +224,20 @@ class IDMService(object): return tenants.Group(dtenant.id, dtenant.desc, dtenant.tenant_id) - - + + def get_tenant_groups(self, admin_token, tenantId, marker, limit, url): self.__validate_token(admin_token) if tenantId == None: raise fault.BadRequestFault("Expecting a Tenant Id") - + dtenant = db_api.tenant_get(tenantId) if dtenant == None: raise fault.ItemNotFoundFault("The tenant not found") - + ts = [] dtenantgroups = db_api.tenant_group_get_page(tenantId, marker,limit) - + for dtenantgroup in dtenantgroups: ts.append(tenants.Group(dtenantgroup.id, dtenantgroup.desc, dtenantgroup.tenant_id)) @@ -244,27 +245,27 @@ class IDMService(object): links=[] if prev: links.append(atom.Link('prev',"%s?'marker=%s&limit=%s'" % (url,prev,limit))) - if next: + if next: links.append(atom.Link('next',"%s?'marker=%s&limit=%s'" % (url,next,limit))) - - + + return tenants.Groups(ts, links) - + def get_tenant_group(self, admin_token, tenant_id, group_id): self.__validate_token(admin_token) - + dtenant = db_api.tenant_get(tenant_id) if dtenant == None: raise fault.ItemNotFoundFault("The tenant not found") - + dtenant = db_api.tenant_group_get(group_id, tenant_id) if not dtenant: raise fault.ItemNotFoundFault("The tenant group not found") - - + + return tenants.Group(dtenant.id, dtenant.desc, dtenant.tenant_id) - - + + def update_tenant_group(self, admin_token, tenant_id, group_id, group): self.__validate_token(admin_token) @@ -275,32 +276,32 @@ class IDMService(object): dtenant = db_api.tenant_get(tenant_id) if dtenant == None: raise fault.ItemNotFoundFault("The tenant not found") - + dtenant = db_api.tenant_group_get(group_id, tenant_id) if not dtenant: raise fault.ItemNotFoundFault("The tenant group not found") - + if group_id != group.group_id: raise fault.BadRequestFault("Wrong Data Provided,Group id not matching") - + if str(tenant_id) != str(group.tenant_id): - raise fault.BadRequestFault("Wrong Data Provided, Tenant id not matching ") - + raise fault.BadRequestFault("Wrong Data Provided, Tenant id not matching ") + values = {'desc': group.description} db_api.tenant_group_update(group_id, tenant_id, values) return tenants.Group(group_id, group.description, tenant_id) - + def delete_tenant_group(self, admin_token, tenant_id, group_id): self.__validate_token(admin_token) dtenant = db_api.tenant_get(tenant_id) - + if dtenant == None: raise fault.ItemNotFoundFault("The tenant not found") - + dtenant = db_api.tenant_group_get(group_id, tenant_id) if not dtenant: raise fault.ItemNotFoundFault("The tenant group not found") @@ -311,9 +312,10 @@ class IDMService(object): db_api.tenant_group_delete(group_id, tenant_id) return None - - - def get_users_tenant_group(self, admin_token, tenantId, groupId, marker, limit, url): + + + def get_users_tenant_group(self, admin_token, tenantId, groupId, marker, + limit, url): self.__validate_token(admin_token) if tenantId == None: raise fault.BadRequestFault("Expecting a Tenant Id") @@ -324,24 +326,78 @@ class IDMService(object): if db_api.tenant_group_get(groupId, tenantId) == None: raise fault.ItemNotFoundFault( "A tenant group with that id not found") - ts = [] - - dgroupusers = db_api.users_tenant_group_get_page( groupId, marker,limit) - for dgroupuser in dgroupusers: + dgroupusers = db_api.users_tenant_group_get_page(groupId, marker, + limit) + for dgroupuser, dgroupuserAsso in dgroupusers: + ts.append(tenants.User(dgroupuser.id, - dtenantgroup.email, tenantId, dtenantgroup.enabled)) - prev,next=db_api.users_tenant_group_get_page_markers( groupId, marker, limit) - links=[] - if prev: - links.append(atom.Link('prev',"%s?'marker=%s&limit=%s'" % (url,prev,limit))) - if next: - links.append(atom.Link('next',"%s?'marker=%s&limit=%s'" % (url,next,limit))) - - + dgroupuser.email, dgroupuser.enabled, + tenantId, None)) + links = [] + if ts.__len__(): + prev, next = db_api.users_tenant_group_get_page_markers(groupId, + marker, limit) + if prev: + links.append(atom.Link('prev', "%s?'marker=%s&limit=%s'" % + (url, prev, limit))) + if next: + links.append(atom.Link('next', "%s?'marker=%s&limit=%s'" % + (url, next, limit))) return tenants.Users(ts, links) + def add_user_tenant_group(self, admin_token, tenant, group, user): + self.__validate_token(admin_token) + + if db_api.tenant_get(tenant) == None: + raise fault.ItemNotFoundFault("The Tenant not found") + + if db_api.group_get(group) == None: + raise fault.ItemNotFoundFault("The Group not found") + duser = db_api.user_get(user) + if duser == None: + raise fault.ItemNotFoundFault("The User not found") + + if db_api.tenant_group_get(group, tenant) == None: + raise fault.ItemNotFoundFault("A tenant group with" + " that id not found") + + if db_api.user_get_by_group(user, group) != None: + raise fault.UserGroupConflictFault( + "A user with that id already exists in group") + + dusergroup = db_models.UserGroupAssociation() + dusergroup.user_id = user + dusergroup.group_id = group + db_api.user_tenant_group(dusergroup) + + return tenants.User(duser.id, duser.email, duser.enabled, + tenant, group) + def delete_user_tenant_group(self, admin_token, tenant, group, user): + self.__validate_token(admin_token) + + if db_api.tenant_get(tenant) == None: + raise fault.ItemNotFoundFault("The Tenant not found") + + if db_api.group_get(group) == None: + raise fault.ItemNotFoundFault("The Group not found") + duser = db_api.user_get(user) + if duser == None: + raise fault.ItemNotFoundFault("The User not found") + + if db_api.tenant_group_get(group, tenant) == None: + raise fault.ItemNotFoundFault("A tenant group with" + " that id not found") + + if db_api.user_get_by_group(user, group) == None: + raise fault.ItemNotFoundFault("A user with that id " + "in a group not found") + + db_api.user_tenant_group_delete(user, group) + return None + + # # Private Operations # @@ -356,6 +412,442 @@ class IDMService(object): user = db_api.user_get(token.user_id) return (token, user) + # + # User Operations + # + def create_user(self, admin_token, tenant_id, user): + self.__validate_token(admin_token) + + dtenant = db_api.tenant_get(tenant_id) + if dtenant == None: + raise fault.UnauthorizedFault("Unauthorized") + if not dtenant.enabled: + raise fault.TenantDisabledFault("Your account has been disabled") + + if not isinstance(user, users.User): + raise fault.BadRequestFault("Expecting a User") + + if user.user_id == None: + raise fault.BadRequestFault("Expecting a unique User Id") + + if db_api.user_get(user.user_id) != None: + raise fault.UserConflictFault( + "An user with that id already exists") + + if db_api.user_get_email(user.email) != None: + raise fault.EmailConflictFault( + "Email already exists") + + + duser_tenant=db_models.UserTenantAssociation() + duser_tenant.user_id=user.user_id + duser_tenant.tenant_id=tenant_id + db_api.user_tenant_create(duser_tenant) + + duser = db_models.User() + duser.id = user.user_id + duser.password = user.password + duser.email = user.email + duser.enabled = user.enabled + db_api.user_create(duser) + + return user + + def get_tenant_users(self, admin_token, tenant_id, marker, limit,url): + self.__validate_token(admin_token) + + if tenant_id == None: + raise fault.BadRequestFault("Expecting a Tenant Id") + + if db_api.tenant_get(tenant_id) == None: + raise fault.ItemNotFoundFault("The tenant not found") + ts = [] + dtenantusers = db_api.users_get_by_tenant_get_page(tenant_id, marker, + limit) + for dtenantuser, dtenantuserAsso in dtenantusers: + ts.append(users.User(None,dtenantuser.id,tenant_id, + dtenantuser.email, dtenantuser.enabled)) + links = [] + if ts.__len__(): + prev, next =db_api.users_get_by_tenant_get_page_markers(tenant_id, + marker, limit) + if prev: + links.append(atom.Link('prev', "%s?'marker=%s&limit=%s'" % + (url, prev, limit))) + if next: + links.append(atom.Link('next', "%s?'marker=%s&limit=%s'" % + (url, next, limit))) + return users.Users(ts, links) + + def get_user(self, admin_token, tenant_id, user_id): + self.__validate_token(admin_token) + dtenant = db_api.tenant_get(tenant_id) + if dtenant == None: + raise fault.UnauthorizedFault("Unauthorized") + if not dtenant.enabled: + raise fault.TenantDisabledFault("Your account has been disabled") + + duser = db_api.user_get(user_id) + if not duser: + raise fault.ItemNotFoundFault("The user could not be found") + + if not duser.enabled: + raise fault.UserDisabledFault("User has been disabled") + + if len(duser.tenants) > 0: + tenant_user = duser.tenants[0].tenant_id + else: + tenant_user = tenant_id + + ts = [] + dusergroups = db_api.user_groups_get_all(user_id) + + for dusergroup, dusergroupAsso in dusergroups: + + + ts.append(tenants.Group(dusergroup.id,dusergroup.tenant_id,None)) + + return users.User_Update(None,duser.id, tenant_user, duser.email, \ + duser.enabled,ts ) + + def update_user(self, admin_token, user_id, user,tenant_id): + self.__validate_token(admin_token) + + dtenant = db_api.tenant_get(tenant_id) + if dtenant == None: + raise fault.UnauthorizedFault("Unauthorized") + if not dtenant.enabled: + raise fault.TenantDisabledFault("Your account has been disabled") + + duser = db_api.user_get(user_id) + if not duser: + raise fault.ItemNotFoundFault("The user could not be found") + + if not duser.enabled: + raise fault.UserDisabledFault("User has been disabled") + + + if not isinstance(user, users.User): + raise fault.BadRequestFault("Expecting a User") + True + duser = db_api.user_get_update(user_id) + if duser == None: + raise fault.ItemNotFoundFault("The user could not be found") + if db_api.user_get_email(user.email) != None: + raise fault.EmailConflictFault( + "Email already exists") + + values = {'email': user.email} + + db_api.user_update(user_id, values) + duser = db_api.user_get_update(user_id) + return users.User(duser.password, duser.id, tenant_id, duser.email, \ + duser.enabled) + + def set_user_password(self, admin_token, user_id, user,tenant_id): + self.__validate_token(admin_token) + + dtenant = db_api.tenant_get(tenant_id) + if dtenant == None: + raise fault.UnauthorizedFault("Unauthorized") + if not dtenant.enabled: + raise fault.TenantDisabledFault("Your account has been disabled") + + duser = db_api.user_get(user_id) + if not duser: + raise fault.ItemNotFoundFault("The user could not be found") + + if not duser.enabled: + raise fault.UserDisabledFault("User has been disabled") + + + if not isinstance(user, users.User): + raise fault.BadRequestFault("Expecting a User") + True + duser = db_api.user_get(user_id) + if duser == None: + raise fault.ItemNotFoundFault("The user could not be found") + + values = {'password': user.password} + + db_api.user_update(user_id, values) + + return users.User(user.password, '', '', '', '') + + def enable_disable_user(self, admin_token, user_id, user,tenant_id): + self.__validate_token(admin_token) + + dtenant = db_api.tenant_get(tenant_id) + if dtenant == None: + raise fault.UnauthorizedFault("Unauthorized") + if not dtenant.enabled: + raise fault.TenantDisabledFault("Your account has been disabled") + + duser = db_api.user_get(user_id) + if not duser: + raise fault.ItemNotFoundFault("The user could not be found") + + if not duser.enabled: + raise fault.UserDisabledFault("User has been disabled") + + + if not isinstance(user, users.User): + raise fault.BadRequestFault("Expecting a User") + True + duser = db_api.user_get(user_id) + if duser == None: + raise fault.ItemNotFoundFault("The user could not be found") + + values = {'enabled': user.enabled} + + db_api.user_update(user_id, values) + + return users.User('','','','',user.enabled) + + def delete_user(self, admin_token, user_id, tenant_id): + self.__validate_token(admin_token) + dtenant = db_api.tenant_get(tenant_id) + if dtenant == None: + raise fault.UnauthorizedFault("Unauthorized") + if not dtenant.enabled: + raise fault.TenantDisabledFault("Your account has been disabled") + + duser = db_api.user_get(user_id) + if not duser: + raise fault.ItemNotFoundFault("The user could not be found") + duser = db_api.user_get_by_tenant(user_id, tenant_id) + if not duser: + raise fault.ItemNotFoundFault("The user could not be " + "found under given tenant") + + db_api.user_delete_tenant(user_id, tenant_id) + return None + + def get_user_groups(self, admin_token, tenant_id,user_id, marker, limit, url): + self.__validate_token(admin_token) + + if tenant_id == None: + raise fault.BadRequestFault("Expecting a Tenant Id") + + if db_api.tenant_get(tenant_id) == None: + raise fault.ItemNotFoundFault("The tenant not found") + + ts = [] + dusergroups = db_api.groups_get_by_user_get_page(user_id, marker, + limit) + print dusergroups + for dusergroup, dusergroupAsso in dusergroups: + + + ts.append(tenants.Group(dusergroup.id,dusergroup.desc,dusergroup.tenant_id)) + links = [] + if ts.__len__(): + prev, next =db_api.groups_get_by_user_get_page_markers(user_id, marker, + limit) + if prev: + links.append(atom.Link('prev', "%s?'marker=%s&limit=%s'" % + (url, prev, limit))) + if next: + links.append(atom.Link('next', "%s?'marker=%s&limit=%s'" % + (url, next, limit))) + return tenants.Groups(ts, links) + + # + + # + # Global Group Operations + # + + def __check_create_global_tenant(self): + + dtenant = db_api.tenant_get('GlobalTenant') + + if dtenant is None: + dtenant = db_models.Tenant() + dtenant.id = 'GlobalTenant' + dtenant.desc = 'GlobalTenant is Default tenant for global groups' + dtenant.enabled = True + db_api.tenant_create(dtenant) + return dtenant + + def create_global_group(self, admin_token, group): + self.__validate_token(admin_token) + + if not isinstance(group, tenants.Group): + raise fault.BadRequestFault("Expecting a Group") + + if group.group_id == None: + raise fault.BadRequestFault("Expecting a Group Id") + + if db_api.group_get(group.group_id) != None: + raise fault.TenantGroupConflictFault( + "A tenant group with that id already exists") + gtenant = self.__check_create_global_tenant() + dtenant = db_models.Group() + dtenant.id = group.group_id + dtenant.desc = group.description + dtenant.tenant_id = gtenant.id + db_api.tenant_group_create(dtenant) + return tenants.Group(dtenant.id, dtenant.desc, None) + + def get_global_groups(self, admin_token, marker, limit, url): + self.__validate_token(admin_token) + gtenant = self.__check_create_global_tenant() + ts = [] + dtenantgroups = db_api.tenant_group_get_page(gtenant.id, \ + marker, limit) + + for dtenantgroup in dtenantgroups: + ts.append(tenants.Group(dtenantgroup.id, + dtenantgroup.desc)) + prev, next = db_api.tenant_group_get_page_markers(gtenant.id, + marker, limit) + links = [] + if prev: + links.append(atom.Link('prev', "%s?'marker=%s&limit=%s'" % + (url, prev, limit))) + if next: + links.append(atom.Link('next', "%s?'marker=%s&limit=%s'" % + (url, next, limit))) + return tenants.Groups(ts, links) + + def get_global_group(self, admin_token, group_id): + self.__validate_token(admin_token) + gtenant = self.__check_create_global_tenant() + dtenant = db_api.tenant_get(gtenant.id) + if dtenant == None: + raise fault.ItemNotFoundFault("The Global tenant not found") + + dtenant = db_api.tenant_group_get(group_id, gtenant.id) + if not dtenant: + raise fault.ItemNotFoundFault("The Global tenant group not found") + return tenants.Group(dtenant.id, dtenant.desc) + + def update_global_group(self, admin_token, group_id, group): + self.__validate_token(admin_token) + gtenant = self.__check_create_global_tenant() + if not isinstance(group, tenants.Group): + raise fault.BadRequestFault("Expecting a Group") + + dtenant = db_api.tenant_get(gtenant.id) + if dtenant == None: + raise fault.ItemNotFoundFault("The global tenant not found") + + dtenant = db_api.tenant_group_get(group_id, gtenant.id) + if not dtenant: + raise fault.ItemNotFoundFault("The Global tenant group not found") + if group_id != group.group_id: + raise fault.BadRequestFault("Wrong Data Provided," + "Group id not matching") + + values = {'desc': group.description} + db_api.tenant_group_update(group_id, gtenant.id, values) + return tenants.Group(group_id, group.description, gtenant.id) + + def delete_global_group(self, admin_token, group_id): + self.__validate_token(admin_token) + gtenant = self.__check_create_global_tenant() + dtenant = db_api.tenant_get(gtenant.id) + + if dtenant == None: + raise fault.ItemNotFoundFault("The global tenant not found") + + dtenant = db_api.tenant_group_get(group_id, gtenant.id) + if not dtenant: + raise fault.ItemNotFoundFault("The global tenant group not found") + + if not db_api.tenant_group_is_empty(group_id): + raise fault.ForbiddenFault("You may not delete a group that " + "contains users") + + db_api.tenant_group_delete(group_id, gtenant.id) + return None + + def get_users_global_group(self, admin_token, groupId, marker, limit, url): + self.__validate_token(admin_token) + gtenant = self.__check_create_global_tenant() + if gtenant.id == None: + raise fault.BadRequestFault("Expecting a global Tenant") + + if db_api.tenant_get(gtenant.id) == None: + raise fault.ItemNotFoundFault("The global tenant not found") + + if db_api.tenant_group_get(groupId, gtenant.id) == None: + raise fault.ItemNotFoundFault( + "A global tenant group with that id not found") + ts = [] + dgroupusers = db_api.users_tenant_group_get_page(groupId, marker, + limit) + for dgroupuser, dgroupuserassoc in dgroupusers: + ts.append(tenants.User(dgroupuser.id, dgroupuser.email, + dgroupuser.enabled)) + links = [] + if ts.__len__(): + prev, next = db_api.users_tenant_group_get_page_markers(groupId, + marker, limit) + if prev: + links.append(atom.Link('prev', "%s?'marker=%s&limit=%s'" + % (url, prev, limit))) + if next: + links.append(atom.Link('next', "%s?'marker=%s&limit=%s'" + % (url, next, limit))) + return tenants.Users(ts, links) + + def add_user_global_group(self, admin_token, group, user): + self.__validate_token(admin_token) + gtenant = self.__check_create_global_tenant() + + if db_api.tenant_get(gtenant.id) == None: + raise fault.ItemNotFoundFault("The Global Tenant not found") + + if db_api.group_get(group) == None: + raise fault.ItemNotFoundFault("The Group not found") + duser = db_api.user_get(user) + if duser == None: + raise fault.ItemNotFoundFault("The User not found") + + if db_api.tenant_group_get(group, gtenant.id) == None: + raise fault.ItemNotFoundFault("A global tenant group with" + " that id not found") + + if db_api.user_get_by_group(user, group) != None: + raise fault.UserGroupConflictFault( + "A user with that id already exists in group") + + dusergroup = db_models.UserGroupAssociation() + dusergroup.user_id = user + dusergroup.group_id = group + db_api.user_tenant_group(dusergroup) + + return tenants.User(duser.id, duser.email, duser.enabled, + group_id = group) + + def delete_user_global_group(self, admin_token, group, user): + self.__validate_token(admin_token) + gtenant = self.__check_create_global_tenant() + + if db_api.tenant_get(gtenant.id) == None: + raise fault.ItemNotFoundFault("The Global Tenant not found") + + if db_api.group_get(group) == None: + raise fault.ItemNotFoundFault("The Group not found") + duser = db_api.user_get(user) + if duser == None: + raise fault.ItemNotFoundFault("The User not found") + + if db_api.tenant_group_get(group, gtenant.id) == None: + raise fault.ItemNotFoundFault("A global tenant group with " + "that id not found") + + if db_api.user_get_by_group(user, group) == None: + raise fault.ItemNotFoundFault("A user with that id in a " + "group not found") + + db_api.user_tenant_group_delete(user, group) + return None + + # + def __get_auth_data(self, dtoken, duser): """return AuthData object for a token/user pair""" diff --git a/keystone/logic/types/fault.py b/keystone/logic/types/fault.py index fd2e85cc..db8ef90d 100644 --- a/keystone/logic/types/fault.py +++ b/keystone/logic/types/fault.py @@ -79,14 +79,6 @@ class UnauthorizedFault(IDMFault): self.key = "unauthorized" -class UserDisabledFault(IDMFault): - "The user is disabled" - - def __init__(self, msg, details=None, code=403): - super(UserDisabledFault, self).__init__(msg, details, code) - self.key = "userDisabled" - - class ForbiddenFault(IDMFault): "The user is forbidden" @@ -94,6 +86,13 @@ class ForbiddenFault(IDMFault): super(ForbiddenFault, self).__init__(msg, details, code) self.key = "forbidden" +class TenantDisabledFault(IDMFault): + "The tenant is disabled" + + def __init__(self, msg, details=None, code=403): + super(TenantDisabledFault, self).__init__(msg, details, code) + self.key = "tenantDisabled" + class ItemNotFoundFault(IDMFault): "The item is not found" @@ -126,3 +125,31 @@ class OverlimitFault(IDMFault): self.args = (code, msg, details, retry_at) self.retry_at = retry_at self.key = "overLimit" + +class UserConflictFault(IDMFault): + "The User already exists?" + + def __init__(self, msg, details=None, code=409): + super(UserConflictFault, self).__init__(msg, details, code) + self.key = "userConflict" + +class UserDisabledFault(IDMFault): + "The user is disabled" + + def __init__(self, msg, details=None, code=403): + super(UserDisabledFault, self).__init__(msg, details, code) + self.key = "userDisabled" + +class EmailConflictFault(IDMFault): + "The Email already exists?" + + def __init__(self, msg, details=None, code=409): + super(EmailConflictFault, self).__init__(msg, details, code) + self.key = "emailConflict" + +class UserGroupConflictFault(IDMFault): + "The user already exists in group?" + + def __init__(self, msg, details=None, code=409): + super(UserGroupConflictFault, self).__init__(msg, details, code) + self.key = "userGroupConflict" diff --git a/keystone/logic/types/user.py b/keystone/logic/types/user.py new file mode 100644 index 00000000..a2496806 --- /dev/null +++ b/keystone/logic/types/user.py @@ -0,0 +1,268 @@ +# Copyright (c) 2010-2011 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import keystone.logic.types.fault as fault +from lxml import etree +import string + + +class User(object): + "A user." + + def __init__(self, password,user_id, tenant_id, email,enabled): + self.user_id = user_id + self.tenant_id = tenant_id + self.password = password + self.email = email + self.enabled = enabled and True or False + + @staticmethod + def from_xml(xml_str): + try: + dom = etree.Element("root") + dom.append(etree.fromstring(xml_str)) + root = dom.find("{http://docs.openstack.org/idm/api/v1.0}user") + if root == None: + raise fault.BadRequestFault("Expecting User") + user_id=root.get("id") + tenant_id = root.get("tenantId") + email = root.get("email") + password = root.get("password") + enabled = root.get("enabled") + if user_id == None: + raise fault.BadRequestFault("Expecting User") + elif tenant_id == None: + raise fault.BadRequestFault("Expecting User tenant") + elif password == None: + raise fault.BadRequestFault("Expecting User password") + elif email == None: + raise fault.BadRequestFault("Expecting User email") + if enabled == None or enabled == "true" or enabled == "yes": + set_enabled = True + elif enabled == "false" or enabled == "no": + set_enabled = False + else: + raise fault.BadRequestFault("Bad enabled attribute!") + if password == '': + password=user_id + return User(password,user_id,tenant_id,email,set_enabled) + except etree.LxmlError as e: + raise fault.BadRequestFault("Cannot parse User", str(e)) + + @staticmethod + def from_json(json_str): + try: + obj = json.loads(json_str) + print obj + if not "user" in obj: + raise fault.BadRequestFault("Expecting User") + user = obj["user"] + if not "id" in user: + user_id = None + else: + user_id = user["id"] + if not "password" in user: + raise fault.BadRequestFault("Expecting User Password") + password = user["password"] + if not "tenantId" in user: + raise fault.BadRequestFault("Expecting User Tenant") + tenant_id = user["tenantId"] + if not "email" in user: + raise fault.BadRequestFault("Expecting User Email") + email = user["email"] + if "enabled" in user: + set_enabled = user["enabled"] + if not isinstance(set_enabled, bool): + raise fault.BadRequestFault("Bad enabled attribute!") + else: + set_enabled=True + if password == '': + password=user_id + return User(password,user_id,tenant_id,email,set_enabled) + except (ValueError, TypeError) as e: + raise fault.BadRequestFault("Cannot parse Tenant", str(e)) + + def to_dom(self): + dom = etree.Element("user", + xmlns="http://docs.openstack.org/idm/api/v1.0") + if self.email: + dom.set("email", self.email) + if self.tenant_id: + dom.set("tenantId",self.tenant_id) + if self.user_id: + dom.set("id",self.user_id) + if self.enabled: + dom.set("enabled",string.lower(str(self.enabled))) + if self.password: + dom.set("password",self.password) + + + return dom + + def to_xml(self): + print '34' + return etree.tostring(self.to_dom()) + + def to_dict(self): + user = {} + + if self.user_id: + user["id"] = self.user_id + user["tenantId"]=self.tenant_id + if self.password: + user["password"]=self.password + user["email"]=self.email + user["enabled"]=self.enabled + return {'user': user} + + def to_json(self): + return json.dumps(self.to_dict()) + +class User_Update(object): + "A user." + + def __init__(self, password,user_id, tenant_id, email,enabled, group=None): + self.user_id = user_id + self.tenant_id = tenant_id + self.password = password + self.email = email + self.enabled = enabled and True or False + if group is not None: + self.group=group + @staticmethod + def from_xml(xml_str): + try: + dom = etree.Element("root") + dom.append(etree.fromstring(xml_str)) + root = dom.find("{http://docs.openstack.org/idm/api/v1.0}user") + if root == None: + raise fault.BadRequestFault("Expecting User") + user_id=root.get("id") + tenant_id = root.get("tenantId") + email = root.get("email") + password = root.get("password") + enabled = root.get("enabled") + if enabled == None or enabled == "true" or enabled == "yes": + set_enabled = True + elif enabled == "false" or enabled == "no": + set_enabled = False + else: + raise fault.BadRequestFault("Bad enabled attribute!") + if password == '': + password=user_id + return User(password,user_id,tenant_id,email,set_enabled) + except etree.LxmlError as e: + raise fault.BadRequestFault("Cannot parse User", str(e)) + + @staticmethod + def from_json(json_str): + try: + obj = json.loads(json_str) + print obj + if not "user" in obj: + raise fault.BadRequestFault("Expecting User") + user = obj["user"] + if not "id" in user: + user_id = None + else: + user_id = user["id"] + if not "password" in user: + password = None + else: + password=user["password"] + if not "tenantId" in user: + tenant_id=None + else: + tenant_id = user["tenantId"] + if not "email" in user: + email=None + else: + email = user["email"] + if "enabled" in user: + set_enabled = user["enabled"] + if not isinstance(set_enabled, bool): + raise fault.BadRequestFault("Bad enabled attribute!") + else: + set_enabled=True + if password == '': + password=user_id + return User(password,user_id,tenant_id,email,set_enabled) + except (ValueError, TypeError) as e: + raise fault.BadRequestFault("Cannot parse Tenant", str(e)) + + def to_dom(self): + dom = etree.Element("user", + xmlns="http://docs.openstack.org/idm/api/v1.0") + if self.email: + dom.set("email", self.email) + if self.tenant_id: + dom.set("tenantId",self.tenant_id) + if self.user_id: + dom.set("id",self.user_id) + if self.enabled: + dom.set("enabled",string.lower(str(self.enabled))) + if self.password: + dom.set("password",self.password) + if self.group: + print '78' + for group in self.group: + dom.append(group.to_dom()) + return dom + + def to_xml(self): + return etree.tostring(self.to_dom()) + + def to_dict(self): + user = {} + + if self.user_id: + user["id"] = self.user_id + if self.user_id: + user["tenantId"]=self.tenant_id + if self.password: + user["password"]=self.password + if self.email: + user["email"]=self.email + if self.enabled: + user["enabled"]=self.enabled + if self.group: + values=[t.to_dict()["group"] for t in self.group] + user["groups"] = {"values": values} + return {'user': user} + + def to_json(self): + return json.dumps(self.to_dict()) + +class Users(object): + "A collection of users." + + def __init__(self, values, links): + self.values = values + self.links = links + + def to_xml(self): + dom = etree.Element("users") + dom.set(u"xmlns", "http://docs.openstack.org/idm/api/v1.0") + for t in self.values: + dom.append(t.to_dom()) + for t in self.links: + dom.append(t.to_dom()) + return etree.tostring(dom) + + def to_json(self): + values = [t.to_dict()["user"] for t in self.values] + links = [t.to_dict()["links"] for t in self.links] + return json.dumps({"users": {"values": values, "links": links}})
\ No newline at end of file diff --git a/test/unit/test_identity.py b/test/unit/test_identity.py index 76125f79..ec537ab9 100644 --- a/test/unit/test_identity.py +++ b/test/unit/test_identity.py @@ -1,2093 +1,2092 @@ -import os
-import sys
-# Need to access identity module
-sys.path.append(os.path.abspath(os.path.join(os.path.abspath(__file__),
- '..', '..', '..', '..', 'keystone')))
-from keystone import identity
-import unittest
-from webtest import TestApp
-import httplib2
-import json
-from lxml import etree
-import unittest
-from webtest import TestApp
-
-URL = 'http://localhost:8080/v1.0/'
-
-
-def get_token(user, pswd, kind=''):
- h = httplib2.Http(".cache")
- url = '%stoken' % URL
- body = {"passwordCredentials": {"username": user,
- "password": pswd}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json"})
- content = json.loads(content)
- token = str(content['auth']['token']['id'])
- if kind == 'token':
- return token
- else:
- return (resp, content)
-
-
-def delete_token(token, auth_token):
- h = httplib2.Http(".cache")
- url = '%stoken/%s' % (URL, token)
- resp, content = h.request(url, "DELETE", body='', \
- headers={"Content-Type": "application/json", \
- "X-Auth-Token": auth_token})
- return (resp, content)
-
-
-def create_tenant(tenantid, auth_token):
- h = httplib2.Http(".cache")
-
- url = '%stenants' % (URL)
- body = {"tenant": {"id": tenantid,
- "description": "A description ...",
- "enabled": True}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": auth_token})
- return (resp, content)
-
-
-def create_tenant_group(groupid, tenantid, auth_token):
- h = httplib2.Http(".cache")
-
- url = '%stenant/%s/groups' % (URL,tenantid)
- body = {"group": {"id": groupid,
- "description": "A description ..."
- }}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": auth_token})
- return (resp, content)
-
-
-def delete_tenant(tenantid, auth_token):
- h = httplib2.Http(".cache")
- url = '%stenants/%s' % (URL, tenantid)
- resp, content = h.request(url, "DELETE", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": auth_token})
- return (resp, content)
-
-
-def delete_tenant_group(groupid, tenantid, auth_token):
- h = httplib2.Http(".cache")
- url = '%stenant/%s/groups/%s' % (URL, tenantid, groupid)
- resp, content = h.request(url, "DELETE", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": auth_token})
- return (resp, content)
-
-
-def create_global_group(auth_token):
- h = httplib2.Http(".cache")
-
- url = '%s/groups' % (URL)
- body = {"group": {"id": 'Admin',
- "description": "A description ..."
- }}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": auth_token})
- return (resp, content)
-
-
-def delete_global_group(groupid, auth_token):
- h = httplib2.Http(".cache")
- url = '%s/groups/%s' % (URL, groupid)
- resp, content = h.request(url, "DELETE", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": auth_token})
- return (resp, content)
-
-
-def get_token_xml(user, pswd, type=''):
- h = httplib2.Http(".cache")
- url = '%stoken' % URL
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <passwordCredentials \
- xmlns="http://docs.openstack.org/idm/api/v1.0" \
- password="%s" username="%s" \
- tenantId="77654"/> ' % (pswd, user)
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "ACCEPT": "application/xml"})
- dom = etree.fromstring(content)
- root = dom.find("{http://docs.openstack.org/idm/api/v1.0}token")
- token_root = root.attrib
- token = str(token_root['id'])
- if type == 'token':
- return token
- else:
- return (resp, content)
-
-
-def delete_token_xml(token, auth_token):
- h = httplib2.Http(".cache")
- url = '%stoken/%s' % (URL, token)
- resp, content = h.request(url, "DELETE", body='',\
- headers={"Content-Type": "application/xml", \
- "X-Auth-Token": auth_token,
- "ACCEPT": "application/xml"})
- return (resp, content)
-
-
-def create_tenant_xml(tenantid, auth_token):
- h = httplib2.Http(".cache")
- url = '%stenants' % (URL)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true" id="%s"> \
- <description>A description...</description> \
- </tenant>' % tenantid
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": auth_token,
- "ACCEPT": "application/xml"})
- return (resp, content)
-
-
-def create_tenant_group_xml(groupid, tenantid, auth_token):
- h = httplib2.Http(".cache")
- url = '%stenant/%s/groups' % (URL,tenantid)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % groupid
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": auth_token,
- "ACCEPT": "application/xml"})
- return (resp, content)
-
-
-def delete_tenant_xml(tenantid, auth_token):
- h = httplib2.Http(".cache")
- url = '%stenants/%s' % (URL, tenantid)
- resp, content = h.request(url, "DELETE", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": auth_token,
- "ACCEPT": "application/xml"})
- return (resp, content)
-
-
-def delete_tenant_group_xml(groupid, tenantid, auth_token):
- h = httplib2.Http(".cache")
- url = '%stenant/%s/groups/%s' % (URL, tenantid, groupid)
- resp, content = h.request(url, "DELETE", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": auth_token,
- "ACCEPT": "application/xml"})
- return (resp, content)
-
-
-def create_global_group_xml(auth_token):
- h = httplib2.Http(".cache")
- url = '%s/groups' % (URL)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="Admin"> \
- <description>A description...</description> \
- </group>'
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": auth_token,
- "ACCEPT": "application/xml"})
- return (resp, content)
-
-
-def delete_global_group_xml(groupid, auth_token):
- h = httplib2.Http(".cache")
- url = '%s/groups/%s' % (URL, groupid)
- resp, content = h.request(url, "DELETE", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": auth_token,
- "ACCEPT": "application/xml"})
- return (resp, content)
-
-
-def get_tenant():
- return '1234'
-
-
-def get_user():
- return '1234'
-
-
-def get_userdisabled():
- return '1234'
-
-
-def get_auth_token():
- return '999888777666'
-
-
-def get_exp_auth_token():
- return '000999'
-
-
-def get_disabled_token():
- return '999888777'
-
-
-class identity_test(unittest.TestCase):
-
- #Given _a_ to make inherited test cases in an order.
- #here to call below method will call as last test case
-
- def test_a_get_version(self):
- h = httplib2.Http(".cache")
- url = URL
- resp, content = h.request(url, "GET", body="",
- headers={"Content-Type": "application/json"})
- self.assertEqual(200, int(resp['status']))
- self.assertEqual('application/json', resp['content-type'])
-
- def test_a_get_version(self):
- h = httplib2.Http(".cache")
- url = URL
- resp, content = h.request(url, "GET", body="",
- headers={"Content-Type": "application/xml",
- "ACCEPT": "application/xml"})
- self.assertEqual(200, int(resp['status']))
- self.assertEqual('application/xml', resp['content-type'])
-
-
-class authorize_test(identity_test):
-
- def setUp(self):
- self.token = get_token('joeuser', 'secrete', 'token')
- self.tenant = get_tenant()
- self.user = get_user()
- self.userdisabled = get_userdisabled()
- self.auth_token = get_auth_token()
- self.exp_auth_token = get_exp_auth_token()
- self.disabled_token = get_disabled_token()
-
-
-
- def tearDown(self):
- delete_token(self.token, self.auth_token)
-
- def test_a_authorize(self):
- resp, content = get_token('joeuser', 'secrete')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual('application/json', resp['content-type'])
-
- def test_a_authorize_xml(self):
- resp, content = get_token_xml('joeuser', 'secrete')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual('application/xml', resp['content-type'])
-
- def test_a_authorize_user_disaabled(self):
- h = httplib2.Http(".cache")
- url = '%stoken' % URL
- body = {"passwordCredentials": {"username": "disabled",
- "password": "self.tenant_group='test_tenant_group'secrete"}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json"})
- content = json.loads(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_a_authorize_user_disaabled_xml(self):
- h = httplib2.Http(".cache")
- url = '%stoken' % URL
-
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <passwordCredentials \
- xmlns="http://docs.openstack.org/idm/api/v1.0" \
- password="secrete" username="disabled" \
- />'
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "ACCEPT": "application/xml"})
- content = etree.fromstring(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_a_authorize_user_wrong(self):
- h = httplib2.Http(".cache")
- url = '%stoken' % URL
- body = {"passwordCredentials": {"username-w": "disabled",
- "password": "secrete"}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json"})
- content = json.loads(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(400, int(resp['status']))
-
- def test_a_authorize_user_wrong_xml(self):
- h = httplib2.Http(".cache")
- url = '%stoken' % URL
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <passwordCredentials \
- xmlns="http://docs.openstack.org/idm/api/v1.0" \
- password="secrete" username-w="disabled" \
- />'
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "ACCEPT": "application/xml"})
- content = etree.fromstring(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(400, int(resp['status']))
-
-
-class validate_token(authorize_test):
-
- def test_validate_token_true(self):
- h = httplib2.Http(".cache")
-
- url = '%stoken/%s?belongsTo=%s' % (URL, self.token, self.tenant)
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/json", \
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual('application/json', resp['content-type'])
-
- def test_validate_token_true_xml(self):
- h = httplib2.Http(".cache")
- url = '%stoken/%s?belongsTo=%s' % (URL, self.token, self.tenant)
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml", \
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual('application/xml', resp['content-type'])
-
- def test_validate_token_expired(self):
- h = httplib2.Http(".cache")
- url = '%stoken/%s?belongsTo=%s' % (URL, self.exp_auth_token, \
- self.tenant)
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/json", \
- "X-Auth-Token": self.exp_auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
- self.assertEqual('application/json', resp['content-type'])
-
- def test_validate_token_expired_xml(self):
- h = httplib2.Http(".cache")
-
- url = '%stoken/%s?belongsTo=%s' % (URL, self.exp_auth_token, \
- self.tenant)
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml", \
- "X-Auth-Token": self.exp_auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
- self.assertEqual('application/xml', resp['content-type'])
-
- def test_validate_token_invalid(self):
- h = httplib2.Http(".cache")
- url = '%stoken/%s?belongsTo=%s' % (URL, 'NonExistingToken', \
- self.tenant)
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/json", \
- "X-Auth-Token": self.auth_token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
- self.assertEqual('application/json', resp['content-type'])
-
- def test_validate_token_invalid_xml(self):
- h = httplib2.Http(".cache")
- url = '%stoken/%s?belongsTo=%s' % (URL, 'NonExistingToken', \
- self.tenant)
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/json", \
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
- self.assertEqual('application/json', resp['content-type'])
-
-
-class tenant_test(unittest.TestCase):
-
- def setUp(self):
- self.token = get_token('joeuser', 'secrete', 'token')
- self.tenant = get_tenant()
- self.user = get_user()
- self.userdisabled = get_userdisabled()
- self.auth_token = get_auth_token()
- self.exp_auth_token = get_exp_auth_token()
- self.disabled_token = get_disabled_token()
-
- def tearDown(self):
- resp, content = delete_tenant(self.tenant, self.auth_token)
-""" "passwordCredentials" : {"username" : "joeuser","password": "secrete","tenantId": "1234"}
-"""
-
-class create_tenant_test(tenant_test):
-
- def test_tenant_create(self):
- resp, content = delete_tenant('test_tenant', str(self.auth_token))
-
- resp, content = create_tenant('test_tenant', str(self.auth_token))
- self.tenant = 'test_tenant'
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
-
- if int(resp['status']) not in (200, 201):
-
- self.fail('Failed due to %d' % int(resp['status']))
-
- def test_tenant_create_xml(self):
- resp, content = delete_tenant_xml('test_tenant', str(self.auth_token))
- resp, content = create_tenant_xml('test_tenant', str(self.auth_token))
- self.tenant = 'test_tenant'
- content = etree.fromstring(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
-
- if int(resp['status']) not in (200, 201):
-
- self.fail('Failed due to %d' % int(resp['status']))
-
- def test_tenant_create_again(self):
-
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(409, int(resp['status']))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- def test_tenant_create_again_xml(self):
-
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get("id")
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(409, int(resp['status']))
- if int(resp['status']) == 200:
- self.tenant = content.get("id")
-
- def test_tenant_create_forbidden_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenants' % (URL)
- body = {"tenant": {"id": self.tenant,
- "description": "A description ...",
- "enabled": True}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": self.token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_create_forbidden_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenants' % (URL)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true" id="%s"> \
- <description>A description...</description> \
- </tenant>' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_create_expired_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenants' % (URL)
- body = {"tenant": {"id": self.tenant,
- "description": "A description ...",
- "enabled": True}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": self.exp_auth_token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_create_expired_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenants' % (URL)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true" id="%s"> \
- <description>A description...</description> \
- </tenant>' % self.tenant
-
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.exp_auth_token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_create_missing_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenants' % (URL)
- body = {"tenant": {"id": self.tenant,
- "description": "A description ...",
- "enabled": True}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_create_missing_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenants' % (URL)
-
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true" id="%s"> \
- <description>A description...</description> \
- </tenant>' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_create_disabled_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenants' % (URL)
- body = '{"tenant": { "id": "%s", \
- "description": "A description ...", "enabled"\
- :true } }' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.disabled_token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_create_disabled_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenants' % (URL)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true" id="%s"> \
- <description>A description...</description> \
- </tenant>' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "X-Auth-Token": self.disabled_token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_create_invalid_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenants' % (URL)
- body = '{"tenant": { "id": "%s", \
- "description": "A description ...", "enabled"\
- :true } }' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": 'nonexsitingtoken'})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_create_invalid_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenants' % (URL)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true" id="%s"> \
- <description>A description...</description> \
- </tenant>' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": 'nonexsitingtoken',
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
-
-class get_tenants_test(tenant_test):
-
- def test_get_tenants(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenants_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenants_forbidden_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_get_tenants_forbidden_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_get_tenants_exp_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.exp_auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_get_tenants_exp_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.exp_auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
-
-class get_tenant_test(tenant_test):
-
- def test_get_tenant(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, self.tenant)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenant_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, self.tenant)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenant_bad(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, 'tenant_bad')
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_get_tenant_bad_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, 'tenant_bad')
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_get_tenant_not_found(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/NonexistingID' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_get_tenant_not_found_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/NonexistingID' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
-
-class update_tenant_test(tenant_test):
-
- def test_update_tenant(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, self.tenant)
- data = '{"tenant": { "description": "A NEW description..." ,\
- "enabled":true }}'
- #test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- body = json.loads(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual(int(self.tenant), int(body['tenant']['id']))
- self.assertEqual('A NEW description...', \
- body['tenant']['description'])
-
- def test_update_tenant_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, self.tenant)
- data = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true"> \
- <description>A NEW description...</description> \
- </tenant>'
-
- #test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- body = etree.fromstring(content)
- desc = body.find("{http://docs.openstack.org/idm/api/v1.0}description")
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual(int(self.tenant), int(body.get('id')))
- self.assertEqual('A NEW description...', \
- desc.text)
-
- def test_update_tenant_bad(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, self.tenant)
- data = '{"tenant": { "description_bad": "A NEW description...",\
- "enabled":true }}'
- #test for Content-Type = application/json
-
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(400, int(resp['status']))
-
- def test_update_tenant_bad_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, self.tenant)
- data = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true"> \
- <description_bad>A NEW description...</description> \
- </tenant>'
- #test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(400, int(resp['status']))
-
- def test_update_tenant_not_found(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/NonexistingID' % (URL)
- data = '{"tenant": { "description": "A NEW description...",\
- "enabled":true }}'
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body=data,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_update_tenant_not_found_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/NonexistingID' % (URL)
- data = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true"> \
- <description_bad>A NEW description...</description> \
- </tenant>'
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body=data,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
-
-class delete_tenant_test(tenant_test):
-
- def test_delete_tenant_not_found(self):
- #resp,content=create_tenant("test_tenant_delete", str(self.auth_token))
- resp, content = delete_tenant("test_tenant_delete111", \
- str(self.auth_token))
- self.assertEqual(404, int(resp['status']))
-
- def test_delete_tenant_not_found_xml(self):
- #resp,content=create_tenant("test_tenant_delete", str(self.auth_token))
- resp, content = delete_tenant_xml("test_tenant_delete111", \
- str(self.auth_token))
- self.assertEqual(404, int(resp['status']))
-
- def test_delete_tenant(self):
- resp, content = create_tenant("test_tenant_delete", \
- str(self.auth_token))
- resp, content = delete_tenant("test_tenant_delete", \
- str(self.auth_token))
- self.assertEqual(204, int(resp['status']))
-
- def test_delete_tenant_xml(self):
- resp, content = create_tenant_xml("test_tenant_delete", \
- str(self.auth_token))
- resp, content = delete_tenant_xml("test_tenant_delete", \
- str(self.auth_token))
- self.assertEqual(204, int(resp['status']))
-
-
-
-
-class tenant_group_test(unittest.TestCase):
-
- def setUp(self):
- self.token = get_token('joeuser', 'secrete', 'token')
- self.tenant = get_tenant()
- self.user = get_user()
- self.userdisabled = get_userdisabled()
- self.auth_token = get_auth_token()
- self.exp_auth_token = get_exp_auth_token()
- self.disabled_token = get_disabled_token()
- self.tenant_group = 'test_tenant_group'
-
- def tearDown(self):
- resp, content = delete_tenant_group('test_tenant_group', \
- self.tenant, self.auth_token)
- resp, content = delete_tenant(self.tenant, self.auth_token)
-
-
-class create_tenant_group_test(tenant_group_test):
-
- def test_tenant_group_create(self):
- resp, content = delete_tenant('test_tenant', str(self.auth_token))
- resp, content = create_tenant('test_tenant', str(self.auth_token))
-
- respG, contentG = delete_tenant_group('test_tenant_group', \
- 'test_tenant', str(self.auth_token))
- respG, contentG = create_tenant_group('test_tenant_group', \
- 'test_tenant', str(self.auth_token))
-
- self.tenant = 'test_tenant'
- self.tenant_group = 'test_tenant_group'
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
-
- if int(respG['status']) not in (200, 201):
-
- self.fail('Failed due to %d' % int(respG['status']))
-
- def test_tenant_group_create_xml(self):
- resp, content = delete_tenant_xml('test_tenant', str(self.auth_token))
- resp, content = create_tenant_xml('test_tenant', str(self.auth_token))
- respG, contentG = delete_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
-
- self.tenant = 'test_tenant'
- self.tenant_group = 'test_tenant_group'
- content = etree.fromstring(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
-
- if int(respG['status']) not in (200, 201):
-
- self.fail('Failed due to %d' % int(respG['status']))
-
- def test_tenant_group_create_again(self):
-
- resp, content = create_tenant("test_tenant", str(self.auth_token))
-
- respG, contentG = create_tenant_group('test_tenant_group', \
- "test_tenant", str(self.auth_token))
- respG, contentG = create_tenant_group('test_tenant_group', \
- "test_tenant", str(self.auth_token))
-
- if int(respG['status']) == 200:
- self.tenant = content['tenant']['id']
- self.tenant_group = contentG['group']['id']
- if int(respG['status']) == 500:
- self.fail('IDM fault')
- elif int(respG['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(409, int(respG['status']))
- if int(respG['status']) == 200:
- self.tenant = content['tenant']['id']
- self.tenant_group = contentG['group']['id']
-
- def test_tenant_group_create_again_xml(self):
-
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
-
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
-
- content = etree.fromstring(content)
- contentG = etree.fromstring(contentG)
- if int(respG['status']) == 200:
- self.tenant = content.get("id")
- self.tenant_group = contentG.get("id")
-
- if int(respG['status']) == 500:
- self.fail('IDM fault')
- elif int(respG['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(409, int(respG['status']))
- if int(respG['status']) == 200:
- self.tenant = content.get("id")
- self.tenant_group = contentG.get("id")
-
- def test_tenant_group_create_forbidden_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- if int(respG['status']) == 200:
- self.tenant_group = respG['group']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = {"group": {"id": self.tenant_group,
- "description": "A description ..."
- }}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": self.token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_group_create_forbidden_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml", \
- "X-Auth-Token": self.token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
-
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_group_create_expired_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = {"group": {"id": self.tenant_group,
- "description": "A description ..."
- }}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": self.exp_auth_token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_expired_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant
-
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml", \
- "X-Auth-Token": self.exp_auth_token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_missing_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = {"group": {"id": self.tenant_group,
- "description": "A description ..."}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_missing_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
-
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_disabled_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '{"group": { "id": "%s", \
- "description": "A description ..." } }' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.disabled_token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_group_create_disabled_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "X-Auth-Token": self.disabled_token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_group_create_invalid_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '{"group": { "id": "%s", \
- "description": "A description ..." } }' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": 'nonexsitingtoken'})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_invalid_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": 'nonexsitingtoken',
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
-
-class get_tenant_groups_test(tenant_group_test):
-
- def test_get_tenant_groups(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
-
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
-
- url = '%stenant/%s/groups' % (URL,self.tenant)
-
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenant_groups_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
-
- respG, contentG = create_tenant_group_xml(self.tenant_group,\
- self.tenant, str(self.auth_token))
-
- url = '%stenant/%s/groups' % (URL,self.tenant)
-
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenant_groups_forbidden_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
-
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups' % (URL,self.tenant)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_get_tenant_groups_forbidden_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups' % (URL,self.tenant)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_get_tenant_groups_exp_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups' % (URL,self.tenant)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.exp_auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_get_tenant_groups_exp_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups' % (URL,self.tenant)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.exp_auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
-
-class get_tenant_group_test(tenant_group_test):
-
- def test_get_tenant_group(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenant_group_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group_xml(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenant_group_bad(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,'tenant_bad',self.tenant_group)
-
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_get_tenant_group_bad_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,'tenant_bad',self.tenant_group)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_get_tenant_group_not_found(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,self.tenant,'nonexistinggroup')
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_get_tenant_group_not_found_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,self.tenant,'nonexistinggroup')
-
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
-
-class update_tenant_group_test(tenant_group_test):
-
- def test_update_tenant_group(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group)
-
- data = '{"group": { "id":"%s","description": "A NEW description..." ,\
- "tenantId":"%s" }}' % (self.tenant_group,self.tenant)
- #test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- body = json.loads(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual(self.tenant_group, body['group']['id'])
- self.assertEqual('A NEW description...', \
- body['group']['description'])
-
- def test_update_tenant_group_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL, self.tenant ,self.tenant_group)
- data = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- tenantId="%s" id="%s"> \
- <description>A NEW description...</description> \
- </group>' % (self.tenant, self.tenant_group)
-
- #test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
-
- body = etree.fromstring(content)
- desc = body.find("{http://docs.openstack.org/idm/api/v1.0}description")
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual(str(self.tenant_group), str(body.get('id')))
- self.assertEqual('A NEW description...', \
- desc.text)
-
- def test_update_tenant_group_bad(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group)
- data = '{"group": { "description_bad": "A NEW description...",\
- "id":"%s","tenantId":"%s" }}' % (self.tenant_group,self.tenant)
- #test for Content-Type = application/json
-
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(400, int(resp['status']))
-
- def test_update_tenant_group_bad_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group)
- data = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- tenantId="%s" id="%s"> \
- <description_bad>A NEW description...</description> \
- </group>' % (self.tenant, self.tenant_group)
- #test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(400, int(resp['status']))
-
- def test_update_tenant_group_not_found(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/NonexistingID' % (URL, self.tenant)
-
- data = '{"group": { "description": "A NEW description...",\
- "id":"NonexistingID", "tenantId"="test_tenant" }}'
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body=data,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_update_tenant_group_not_found_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/NonexistingID' % (URL, self.tenant)
- data = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="NonexistingID", "tenant_id"="test_tenant"> \
- <description_bad>A NEW description...</description> \
- </group>'
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body=data,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
-
-class delete_tenant_group_test(tenant_test):
-
- def test_delete_tenant_group_not_found(self):
- #resp,content=create_tenant("test_tenant_delete", str(self.auth_token))
- resp, content = delete_tenant_group("test_tenant_delete111", \
- "test_tenant", str(self.auth_token))
- self.assertEqual(404, int(resp['status']))
-
- def test_delete_tenant_group_not_found_xml(self):
- #resp,content=create_tenant("test_tenant_delete", str(self.auth_token))
- resp, content = delete_tenant_group_xml("test_tenant_delete111", \
- "test_tenant", str(self.auth_token))
- self.assertEqual(404, int(resp['status']))
-
- def test_delete_tenant_group(self):
- resp, content = create_tenant("test_tenant_delete", \
- str(self.auth_token))
- respG, contentG = create_tenant_group('test_tenant_group_delete', \
- "test_tenant_delete", str(self.auth_token))
- respG, contentG = delete_tenant_group('test_tenant_group_delete', \
- "test_tenant_delete", str(self.auth_token))
- resp, content = delete_tenant("test_tenant_delete", \
- str(self.auth_token))
- self.assertEqual(204, int(respG['status']))
-
- def test_delete_tenant_group_xml(self):
- resp, content = create_tenant_xml("test_tenant_delete", \
- str(self.auth_token))
- respG, contentG = create_tenant_group_xml('test_tenant_group_delete', \
- "test_tenant_delete", str(self.auth_token))
- respG, contentG = delete_tenant_group_xml('test_tenant_group_delete', \
- "test_tenant_delete", str(self.auth_token))
- resp, content = delete_tenant_xml("test_tenant_delete", \
- str(self.auth_token))
- self.assertEqual(204, int(respG['status']))
-
-class create_global_group_test(global_group_test):
-
- def test_global_group_create(self):
-
- respG, contentG = delete_global_group('test_tenant_group', \
- str(self.auth_token))
- respG, contentG = create_global_group(str(self.auth_token))
- self.group = 'test_tenant_group'
-
- if int(respG['status']) == 500:
- self.fail('IDM fault')
- elif int(respG['status']) == 503:
- self.fail('Service Not Available')
- if int(respG['status']) not in (200, 201):
- self.fail('Failed due to %d' % int(respG['status']))
-
-
- def test_global_group_create_again(self):
-
- respG, contentG = create_global_group('test_tenant_group', \
- str(self.auth_token))
- respG, contentG = create_global_group('test_tenant_group', \
- "test_tenant", str(self.auth_token))
-
- if int(respG['status']) == 200:
- self.tenant = content['tenant']['id']
- self.tenant_group = contentG['group']['id']
- if int(respG['status']) == 500:
- self.fail('IDM fault')
- elif int(respG['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(409, int(respG['status']))
- if int(respG['status']) == 200:
- self.tenant = content['tenant']['id']
- self.tenant_group = contentG['group']['id']
-
-
- def test_tenant_group_create_forbidden_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- if int(respG['status']) == 200:
- self.tenant_group = respG['group']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = {"group": {"id": self.tenant_group,
- "description": "A description ..."
- }}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": self.token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
-
- def test_tenant_group_create_expired_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = {"group": {"id": self.tenant_group,
- "description": "A description ..."
- }}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": self.exp_auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_missing_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = {"group": {"id": self.tenant_group,
- "description": "A description ..."}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_disabled_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '{"group": { "id": "%s", \
- "description": "A description ..." } }' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.disabled_token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_group_create_invalid_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '{"group": { "id": "%s", \
- "description": "A description ..." } }' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": 'nonexsitingtoken'})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
-
-
- def test_tenant_group_create_xml(self):
- resp, content = delete_tenant_xml('test_tenant', str(self.auth_token))
- resp, content = create_tenant_xml('test_tenant', str(self.auth_token))
- respG, contentG = delete_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
-
- self.tenant = 'test_tenant'
- self.tenant_group = 'test_tenant_group'
- content = etree.fromstring(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
-
- if int(respG['status']) not in (200, 201):
-
- self.fail('Failed due to %d' % int(respG['status']))
-
- def test_tenant_group_create_again_xml(self):
-
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
-
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
-
- content = etree.fromstring(content)
- contentG = etree.fromstring(contentG)
- if int(respG['status']) == 200:
- self.tenant = content.get("id")
- self.tenant_group = contentG.get("id")
-
- if int(respG['status']) == 500:
- self.fail('IDM fault')
- elif int(respG['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(409, int(respG['status']))
- if int(respG['status']) == 200:
- self.tenant = content.get("id")
- self.tenant_group = contentG.get("id")
-
- def test_tenant_group_create_forbidden_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml", \
- "X-Auth-Token": self.token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
-
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_group_create_expired_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant
-
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml", \
- "X-Auth-Token": self.exp_auth_token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_missing_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
-
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_disabled_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "X-Auth-Token": self.disabled_token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_group_create_invalid_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": 'nonexsitingtoken',
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
-
-
-if __name__ == '__main__':
- unittest.main()
+import os +import sys +# Need to access identity module +sys.path.append(os.path.abspath(os.path.join(os.path.abspath(__file__), + '..', '..', '..', '..', 'keystone'))) +from keystone import identity +import unittest +from webtest import TestApp +import httplib2 +import json +from lxml import etree +import unittest +from webtest import TestApp + +URL = 'http://localhost:8080/v1.0/' + + +def get_token(user, pswd, kind=''): + h = httplib2.Http(".cache") + url = '%stoken' % URL + body = {"passwordCredentials": {"username": user, + "password": pswd}} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json"}) + content = json.loads(content) + token = str(content['auth']['token']['id']) + if kind == 'token': + return token + else: + return (resp, content) + + +def delete_token(token, auth_token): + h = httplib2.Http(".cache") + url = '%stoken/%s' % (URL, token) + resp, content = h.request(url, "DELETE", body='', \ + headers={"Content-Type": "application/json", \ + "X-Auth-Token": auth_token}) + return (resp, content) + + +def create_tenant(tenantid, auth_token): + h = httplib2.Http(".cache") + + url = '%stenants' % (URL) + body = {"tenant": {"id": tenantid, + "description": "A description ...", + "enabled": True}} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json", + "X-Auth-Token": auth_token}) + return (resp, content) + + +def create_tenant_group(groupid, tenantid, auth_token): + h = httplib2.Http(".cache") + + url = '%stenant/%s/groups' % (URL,tenantid) + body = {"group": {"id": groupid, + "description": "A description ..." + }} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json", + "X-Auth-Token": auth_token}) + return (resp, content) + + +def delete_tenant(tenantid, auth_token): + h = httplib2.Http(".cache") + url = '%stenants/%s' % (URL, tenantid) + resp, content = h.request(url, "DELETE", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": auth_token}) + return (resp, content) + + +def delete_tenant_group(groupid, tenantid, auth_token): + h = httplib2.Http(".cache") + url = '%stenant/%s/groups/%s' % (URL, tenantid, groupid) + resp, content = h.request(url, "DELETE", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": auth_token}) + return (resp, content) + + +def create_global_group(auth_token): + h = httplib2.Http(".cache") + + url = '%s/groups' % (URL) + body = {"group": {"id": 'Admin', + "description": "A description ..." + }} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json", + "X-Auth-Token": auth_token}) + return (resp, content) + + +def delete_global_group(groupid, auth_token): + h = httplib2.Http(".cache") + url = '%s/groups/%s' % (URL, groupid) + resp, content = h.request(url, "DELETE", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": auth_token}) + return (resp, content) + + +def get_token_xml(user, pswd, type=''): + h = httplib2.Http(".cache") + url = '%stoken' % URL + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <passwordCredentials \ + xmlns="http://docs.openstack.org/idm/api/v1.0" \ + password="%s" username="%s" \ + tenantId="77654"/> ' % (pswd, user) + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml", + "ACCEPT": "application/xml"}) + dom = etree.fromstring(content) + root = dom.find("{http://docs.openstack.org/idm/api/v1.0}token") + token_root = root.attrib + token = str(token_root['id']) + if type == 'token': + return token + else: + return (resp, content) + + +def delete_token_xml(token, auth_token): + h = httplib2.Http(".cache") + url = '%stoken/%s' % (URL, token) + resp, content = h.request(url, "DELETE", body='',\ + headers={"Content-Type": "application/xml", \ + "X-Auth-Token": auth_token, + "ACCEPT": "application/xml"}) + return (resp, content) + + +def create_tenant_xml(tenantid, auth_token): + h = httplib2.Http(".cache") + url = '%stenants' % (URL) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \ + enabled="true" id="%s"> \ + <description>A description...</description> \ + </tenant>' % tenantid + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": auth_token, + "ACCEPT": "application/xml"}) + return (resp, content) + + +def create_tenant_group_xml(groupid, tenantid, auth_token): + h = httplib2.Http(".cache") + url = '%stenant/%s/groups' % (URL,tenantid) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="%s"> \ + <description>A description...</description> \ + </group>' % groupid + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": auth_token, + "ACCEPT": "application/xml"}) + return (resp, content) + + +def delete_tenant_xml(tenantid, auth_token): + h = httplib2.Http(".cache") + url = '%stenants/%s' % (URL, tenantid) + resp, content = h.request(url, "DELETE", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": auth_token, + "ACCEPT": "application/xml"}) + return (resp, content) + + +def delete_tenant_group_xml(groupid, tenantid, auth_token): + h = httplib2.Http(".cache") + url = '%stenant/%s/groups/%s' % (URL, tenantid, groupid) + resp, content = h.request(url, "DELETE", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": auth_token, + "ACCEPT": "application/xml"}) + return (resp, content) + + +def create_global_group_xml(auth_token): + h = httplib2.Http(".cache") + url = '%s/groups' % (URL) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="Admin"> \ + <description>A description...</description> \ + </group>' + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": auth_token, + "ACCEPT": "application/xml"}) + return (resp, content) + + +def delete_global_group_xml(groupid, auth_token): + h = httplib2.Http(".cache") + url = '%s/groups/%s' % (URL, groupid) + resp, content = h.request(url, "DELETE", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": auth_token, + "ACCEPT": "application/xml"}) + return (resp, content) + + +def get_tenant(): + return '1234' + + +def get_user(): + return '1234' + + +def get_userdisabled(): + return '1234' + + +def get_auth_token(): + return '999888777666' + + +def get_exp_auth_token(): + return '000999' + + +def get_disabled_token(): + return '999888777' + + +class identity_test(unittest.TestCase): + + #Given _a_ to make inherited test cases in an order. + #here to call below method will call as last test case + + def test_a_get_version(self): + h = httplib2.Http(".cache") + url = URL + resp, content = h.request(url, "GET", body="", + headers={"Content-Type": "application/json"}) + self.assertEqual(200, int(resp['status'])) + self.assertEqual('application/json', resp['content-type']) + + def test_a_get_version(self): + h = httplib2.Http(".cache") + url = URL + resp, content = h.request(url, "GET", body="", + headers={"Content-Type": "application/xml", + "ACCEPT": "application/xml"}) + self.assertEqual(200, int(resp['status'])) + self.assertEqual('application/xml', resp['content-type']) + + +class authorize_test(identity_test): + + def setUp(self): + self.token = get_token('joeuser', 'secrete', 'token') + self.tenant = get_tenant() + self.user = get_user() + self.userdisabled = get_userdisabled() + self.auth_token = get_auth_token() + self.exp_auth_token = get_exp_auth_token() + self.disabled_token = get_disabled_token() + + + + def tearDown(self): + delete_token(self.token, self.auth_token) + + def test_a_authorize(self): + resp, content = get_token('joeuser', 'secrete') + self.assertEqual(200, int(resp['status'])) + self.assertEqual('application/json', resp['content-type']) + + def test_a_authorize_xml(self): + resp, content = get_token_xml('joeuser', 'secrete') + self.assertEqual(200, int(resp['status'])) + self.assertEqual('application/xml', resp['content-type']) + + def test_a_authorize_user_disaabled(self): + h = httplib2.Http(".cache") + url = '%stoken' % URL + body = {"passwordCredentials": {"username": "disabled", + "password": "self.tenant_group='test_tenant_group'secrete"}} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json"}) + content = json.loads(content) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + def test_a_authorize_user_disaabled_xml(self): + h = httplib2.Http(".cache") + url = '%stoken' % URL + + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <passwordCredentials \ + xmlns="http://docs.openstack.org/idm/api/v1.0" \ + password="secrete" username="disabled" \ + />' + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml", + "ACCEPT": "application/xml"}) + content = etree.fromstring(content) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + def test_a_authorize_user_wrong(self): + h = httplib2.Http(".cache") + url = '%stoken' % URL + body = {"passwordCredentials": {"username-w": "disabled", + "password": "secrete"}} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json"}) + content = json.loads(content) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(400, int(resp['status'])) + + def test_a_authorize_user_wrong_xml(self): + h = httplib2.Http(".cache") + url = '%stoken' % URL + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <passwordCredentials \ + xmlns="http://docs.openstack.org/idm/api/v1.0" \ + password="secrete" username-w="disabled" \ + />' + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml", + "ACCEPT": "application/xml"}) + content = etree.fromstring(content) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(400, int(resp['status'])) + + +class validate_token(authorize_test): + + def test_validate_token_true(self): + h = httplib2.Http(".cache") + + url = '%stoken/%s?belongsTo=%s' % (URL, self.token, self.tenant) + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/json", \ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + self.assertEqual('application/json', resp['content-type']) + + def test_validate_token_true_xml(self): + h = httplib2.Http(".cache") + url = '%stoken/%s?belongsTo=%s' % (URL, self.token, self.tenant) + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/xml", \ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + self.assertEqual('application/xml', resp['content-type']) + + def test_validate_token_expired(self): + h = httplib2.Http(".cache") + url = '%stoken/%s?belongsTo=%s' % (URL, self.exp_auth_token, \ + self.tenant) + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/json", \ + "X-Auth-Token": self.exp_auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + self.assertEqual('application/json', resp['content-type']) + + def test_validate_token_expired_xml(self): + h = httplib2.Http(".cache") + + url = '%stoken/%s?belongsTo=%s' % (URL, self.exp_auth_token, \ + self.tenant) + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/xml", \ + "X-Auth-Token": self.exp_auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + self.assertEqual('application/xml', resp['content-type']) + + def test_validate_token_invalid(self): + h = httplib2.Http(".cache") + url = '%stoken/%s?belongsTo=%s' % (URL, 'NonExistingToken', \ + self.tenant) + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/json", \ + "X-Auth-Token": self.auth_token}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + self.assertEqual('application/json', resp['content-type']) + + def test_validate_token_invalid_xml(self): + h = httplib2.Http(".cache") + url = '%stoken/%s?belongsTo=%s' % (URL, 'NonExistingToken', \ + self.tenant) + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/json", \ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + self.assertEqual('application/json', resp['content-type']) + + +class tenant_test(unittest.TestCase): + + def setUp(self): + self.token = get_token('joeuser', 'secrete', 'token') + self.tenant = get_tenant() + self.user = get_user() + self.userdisabled = get_userdisabled() + self.auth_token = get_auth_token() + self.exp_auth_token = get_exp_auth_token() + self.disabled_token = get_disabled_token() + + def tearDown(self): + resp, content = delete_tenant(self.tenant, self.auth_token) +""" "passwordCredentials" : {"username" : "joeuser","password": "secrete","tenantId": "1234"} +""" + +class create_tenant_test(tenant_test): + + def test_tenant_create(self): + resp, content = delete_tenant('test_tenant', str(self.auth_token)) + + resp, content = create_tenant('test_tenant', str(self.auth_token)) + self.tenant = 'test_tenant' + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + + if int(resp['status']) not in (200, 201): + + self.fail('Failed due to %d' % int(resp['status'])) + + def test_tenant_create_xml(self): + resp, content = delete_tenant_xml('test_tenant', str(self.auth_token)) + resp, content = create_tenant_xml('test_tenant', str(self.auth_token)) + self.tenant = 'test_tenant' + content = etree.fromstring(content) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + + if int(resp['status']) not in (200, 201): + + self.fail('Failed due to %d' % int(resp['status'])) + + def test_tenant_create_again(self): + + resp, content = create_tenant("test_tenant", str(self.auth_token)) + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(409, int(resp['status'])) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + def test_tenant_create_again_xml(self): + + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get("id") + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(409, int(resp['status'])) + if int(resp['status']) == 200: + self.tenant = content.get("id") + + def test_tenant_create_forbidden_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenants' % (URL) + body = {"tenant": {"id": self.tenant, + "description": "A description ...", + "enabled": True}} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json", + "X-Auth-Token": self.token}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + def test_tenant_create_forbidden_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get('id') + + url = '%stenants' % (URL) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \ + enabled="true" id="%s"> \ + <description>A description...</description> \ + </tenant>' % self.tenant + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.token, + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + def test_tenant_create_expired_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenants' % (URL) + body = {"tenant": {"id": self.tenant, + "description": "A description ...", + "enabled": True}} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json", + "X-Auth-Token": self.exp_auth_token}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + def test_tenant_create_expired_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get('id') + + url = '%stenants' % (URL) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \ + enabled="true" id="%s"> \ + <description>A description...</description> \ + </tenant>' % self.tenant + + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.exp_auth_token, + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + def test_tenant_create_missing_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenants' % (URL) + body = {"tenant": {"id": self.tenant, + "description": "A description ...", + "enabled": True}} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + def test_tenant_create_missing_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get('id') + + url = '%stenants' % (URL) + + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \ + enabled="true" id="%s"> \ + <description>A description...</description> \ + </tenant>' % self.tenant + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml", + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + def test_tenant_create_disabled_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenants' % (URL) + body = '{"tenant": { "id": "%s", \ + "description": "A description ...", "enabled"\ + :true } }' % self.tenant + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.disabled_token}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + def test_tenant_create_disabled_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get('id') + + url = '%stenants' % (URL) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \ + enabled="true" id="%s"> \ + <description>A description...</description> \ + </tenant>' % self.tenant + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml", + "X-Auth-Token": self.disabled_token, + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + def test_tenant_create_invalid_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenants' % (URL) + body = '{"tenant": { "id": "%s", \ + "description": "A description ...", "enabled"\ + :true } }' % self.tenant + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": 'nonexsitingtoken'}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + def test_tenant_create_invalid_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get('id') + + url = '%stenants' % (URL) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \ + enabled="true" id="%s"> \ + <description>A description...</description> \ + </tenant>' % self.tenant + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": 'nonexsitingtoken', + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + +class get_tenants_test(tenant_test): + + def test_get_tenants(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants' % (URL) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + + def test_get_tenants_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants' % (URL) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + + def test_get_tenants_forbidden_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants' % (URL) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + def test_get_tenants_forbidden_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants' % (URL) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + def test_get_tenants_exp_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants' % (URL) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.exp_auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + def test_get_tenants_exp_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants' % (URL) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.exp_auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + +class get_tenant_test(tenant_test): + + def test_get_tenant(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/%s' % (URL, self.tenant) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + + def test_get_tenant_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/%s' % (URL, self.tenant) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + + def test_get_tenant_bad(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/%s' % (URL, 'tenant_bad') + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + + def test_get_tenant_bad_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/%s' % (URL, 'tenant_bad') + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + + def test_get_tenant_not_found(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/NonexistingID' % (URL) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + + def test_get_tenant_not_found_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/NonexistingID' % (URL) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + + +class update_tenant_test(tenant_test): + + def test_update_tenant(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/%s' % (URL, self.tenant) + data = '{"tenant": { "description": "A NEW description..." ,\ + "enabled":true }}' + #test for Content-Type = application/json + resp, content = h.request(url, "PUT", body=data,\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + body = json.loads(content) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + self.assertEqual(int(self.tenant), int(body['tenant']['id'])) + self.assertEqual('A NEW description...', \ + body['tenant']['description']) + + def test_update_tenant_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml(self.tenant, str(self.auth_token)) + url = '%stenants/%s' % (URL, self.tenant) + data = '<?xml version="1.0" encoding="UTF-8"?> \ + <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \ + enabled="true"> \ + <description>A NEW description...</description> \ + </tenant>' + + #test for Content-Type = application/json + resp, content = h.request(url, "PUT", body=data,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + body = etree.fromstring(content) + desc = body.find("{http://docs.openstack.org/idm/api/v1.0}description") + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + self.assertEqual(int(self.tenant), int(body.get('id'))) + self.assertEqual('A NEW description...', \ + desc.text) + + def test_update_tenant_bad(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/%s' % (URL, self.tenant) + data = '{"tenant": { "description_bad": "A NEW description...",\ + "enabled":true }}' + #test for Content-Type = application/json + + resp, content = h.request(url, "PUT", body=data,\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(400, int(resp['status'])) + + def test_update_tenant_bad_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/%s' % (URL, self.tenant) + data = '<?xml version="1.0" encoding="UTF-8"?> \ + <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \ + enabled="true"> \ + <description_bad>A NEW description...</description> \ + </tenant>' + #test for Content-Type = application/json + resp, content = h.request(url, "PUT", body=data,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(400, int(resp['status'])) + + def test_update_tenant_not_found(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/NonexistingID' % (URL) + data = '{"tenant": { "description": "A NEW description...",\ + "enabled":true }}' + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body=data,\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + + def test_update_tenant_not_found_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenants/NonexistingID' % (URL) + data = '<?xml version="1.0" encoding="UTF-8"?> \ + <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \ + enabled="true"> \ + <description_bad>A NEW description...</description> \ + </tenant>' + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body=data,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + + +class delete_tenant_test(tenant_test): + + def test_delete_tenant_not_found(self): + #resp,content=create_tenant("test_tenant_delete", str(self.auth_token)) + resp, content = delete_tenant("test_tenant_delete111", \ + str(self.auth_token)) + self.assertEqual(404, int(resp['status'])) + + def test_delete_tenant_not_found_xml(self): + #resp,content=create_tenant("test_tenant_delete", str(self.auth_token)) + resp, content = delete_tenant_xml("test_tenant_delete111", \ + str(self.auth_token)) + self.assertEqual(404, int(resp['status'])) + + def test_delete_tenant(self): + resp, content = create_tenant("test_tenant_delete", \ + str(self.auth_token)) + resp, content = delete_tenant("test_tenant_delete", \ + str(self.auth_token)) + self.assertEqual(204, int(resp['status'])) + + def test_delete_tenant_xml(self): + resp, content = create_tenant_xml("test_tenant_delete", \ + str(self.auth_token)) + resp, content = delete_tenant_xml("test_tenant_delete", \ + str(self.auth_token)) + self.assertEqual(204, int(resp['status'])) + + + + +class tenant_group_test(unittest.TestCase): + +def setUp(self): + self.token = get_token('joeuser', 'secrete', 'token') + self.tenant = get_tenant() + self.user = get_user() + self.userdisabled = get_userdisabled() + self.auth_token = get_auth_token() + self.exp_auth_token = get_exp_auth_token() + self.disabled_token = get_disabled_token() + self.tenant_group = 'test_tenant_group' + +def tearDown(self): + resp, content = delete_tenant_group('test_tenant_group', \ + self.tenant, self.auth_token) + resp, content = delete_tenant(self.tenant, self.auth_token) + + +class create_tenant_group_test(tenant_group_test): + +def test_tenant_group_create(self): + resp, content = delete_tenant('test_tenant', str(self.auth_token)) + resp, content = create_tenant('test_tenant', str(self.auth_token)) + + respG, contentG = delete_tenant_group('test_tenant_group', \ + 'test_tenant', str(self.auth_token)) + respG, contentG = create_tenant_group('test_tenant_group', \ + 'test_tenant', str(self.auth_token)) + + self.tenant = 'test_tenant' + self.tenant_group = 'test_tenant_group' + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + + if int(respG['status']) not in (200, 201): + + self.fail('Failed due to %d' % int(respG['status'])) + +def test_tenant_group_create_xml(self): + resp, content = delete_tenant_xml('test_tenant', str(self.auth_token)) + resp, content = create_tenant_xml('test_tenant', str(self.auth_token)) + respG, contentG = delete_tenant_group_xml('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + respG, contentG = create_tenant_group_xml('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + + self.tenant = 'test_tenant' + self.tenant_group = 'test_tenant_group' + content = etree.fromstring(content) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + + if int(respG['status']) not in (200, 201): + + self.fail('Failed due to %d' % int(respG['status'])) + +def test_tenant_group_create_again(self): + + resp, content = create_tenant("test_tenant", str(self.auth_token)) + + respG, contentG = create_tenant_group('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + respG, contentG = create_tenant_group('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + + if int(respG['status']) == 200: + self.tenant = content['tenant']['id'] + self.tenant_group = contentG['group']['id'] + if int(respG['status']) == 500: + self.fail('IDM fault') + elif int(respG['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(409, int(respG['status'])) + if int(respG['status']) == 200: + self.tenant = content['tenant']['id'] + self.tenant_group = contentG['group']['id'] + +def test_tenant_group_create_again_xml(self): + + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + + respG, contentG = create_tenant_group_xml('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + respG, contentG = create_tenant_group_xml('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + + content = etree.fromstring(content) + contentG = etree.fromstring(contentG) + if int(respG['status']) == 200: + self.tenant = content.get("id") + self.tenant_group = contentG.get("id") + + if int(respG['status']) == 500: + self.fail('IDM fault') + elif int(respG['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(409, int(respG['status'])) + if int(respG['status']) == 200: + self.tenant = content.get("id") + self.tenant_group = contentG.get("id") + +def test_tenant_group_create_forbidden_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + respG, contentG = create_tenant_group_xml('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + if int(respG['status']) == 200: + self.tenant_group = respG['group']['id'] + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = {"group": {"id": self.tenant_group, + "description": "A description ..." + }} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json", + "X-Auth-Token": self.token}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + +def test_tenant_group_create_forbidden_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="%s"> \ + <description>A description...</description> \ + </group>' % self.tenant_group + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml", \ + "X-Auth-Token": self.token, + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + + self.assertEqual(403, int(resp['status'])) + +def test_tenant_group_create_expired_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = {"group": {"id": self.tenant_group, + "description": "A description ..." + }} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json", + "X-Auth-Token": self.exp_auth_token}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + +def test_tenant_group_create_expired_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get('id') + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="%s"> \ + <description>A description...</description> \ + </group>' % self.tenant + + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml", \ + "X-Auth-Token": self.exp_auth_token, + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + +def test_tenant_group_create_missing_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = {"group": {"id": self.tenant_group, + "description": "A description ..."}} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + +def test_tenant_group_create_missing_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get('id') + + url = '%stenant/%s/groups' % (URL, self.tenant) + + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="%s"> \ + <description>A description...</description> \ + </group>' % self.tenant_group + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml", + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + +def test_tenant_group_create_disabled_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '{"group": { "id": "%s", \ + "description": "A description ..." } }' % self.tenant_group + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.disabled_token}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + +def test_tenant_group_create_disabled_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get('id') + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="%s"> \ + <description>A description...</description> \ + </group>' % self.tenant_group + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml", + "X-Auth-Token": self.disabled_token, + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + +def test_tenant_group_create_invalid_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '{"group": { "id": "%s", \ + "description": "A description ..." } }' % self.tenant + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": 'nonexsitingtoken'}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + +def test_tenant_group_create_invalid_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get('id') + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="%s"> \ + <description>A description...</description> \ + </group>' % self.tenant_group + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": 'nonexsitingtoken', + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + +class get_tenant_groups_test(tenant_group_test): + +def test_get_tenant_groups(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + + url = '%stenant/%s/groups' % (URL,self.tenant) + + resp, content = h.request(url, "GET", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + +def test_get_tenant_groups_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + + respG, contentG = create_tenant_group_xml(self.tenant_group,\ + self.tenant, str(self.auth_token)) + + url = '%stenant/%s/groups' % (URL,self.tenant) + + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + +def test_get_tenant_groups_forbidden_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups' % (URL,self.tenant) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + +def test_get_tenant_groups_forbidden_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups' % (URL,self.tenant) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + +def test_get_tenant_groups_exp_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups' % (URL,self.tenant) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.exp_auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + +def test_get_tenant_groups_exp_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups' % (URL,self.tenant) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.exp_auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + +class get_tenant_group_test(tenant_group_test): + +def test_get_tenant_group(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + +def test_get_tenant_group_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group_xml(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + +def test_get_tenant_group_bad(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/%s' % (URL,'tenant_bad',self.tenant_group) + + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + +def test_get_tenant_group_bad_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/%s' % (URL,'tenant_bad',self.tenant_group) + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + +def test_get_tenant_group_not_found(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/%s' % (URL,self.tenant,'nonexistinggroup') + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='{}',\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + +def test_get_tenant_group_not_found_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/%s' % (URL,self.tenant,'nonexistinggroup') + + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body='',\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + + +class update_tenant_group_test(tenant_group_test): + +def test_update_tenant_group(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group) + + data = '{"group": { "id":"%s","description": "A NEW description..." ,\ + "tenantId":"%s" }}' % (self.tenant_group,self.tenant) + #test for Content-Type = application/json + resp, content = h.request(url, "PUT", body=data,\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + body = json.loads(content) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + self.assertEqual(self.tenant_group, body['group']['id']) + self.assertEqual('A NEW description...', \ + body['group']['description']) + +def test_update_tenant_group_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/%s' % (URL, self.tenant ,self.tenant_group) + data = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + tenantId="%s" id="%s"> \ + <description>A NEW description...</description> \ + </group>' % (self.tenant, self.tenant_group) + + #test for Content-Type = application/json + resp, content = h.request(url, "PUT", body=data,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + + body = etree.fromstring(content) + desc = body.find("{http://docs.openstack.org/idm/api/v1.0}description") + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(200, int(resp['status'])) + self.assertEqual(str(self.tenant_group), str(body.get('id'))) + self.assertEqual('A NEW description...', \ + desc.text) + +def test_update_tenant_group_bad(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group) + data = '{"group": { "description_bad": "A NEW description...",\ + "id":"%s","tenantId":"%s" }}' % (self.tenant_group,self.tenant) + #test for Content-Type = application/json + + resp, content = h.request(url, "PUT", body=data,\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(400, int(resp['status'])) + +def test_update_tenant_group_bad_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group) + data = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + tenantId="%s" id="%s"> \ + <description_bad>A NEW description...</description> \ + </group>' % (self.tenant, self.tenant_group) + #test for Content-Type = application/json + resp, content = h.request(url, "PUT", body=data,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(400, int(resp['status'])) + +def test_update_tenant_group_not_found(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + respG, contentG = create_tenant_group(self.tenant_group,\ + self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/NonexistingID' % (URL, self.tenant) + + data = '{"group": { "description": "A NEW description...",\ + "id":"NonexistingID", "tenantId"="test_tenant" }}' + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body=data,\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + +def test_update_tenant_group_not_found_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant(self.tenant, str(self.auth_token)) + url = '%stenant/%s/groups/NonexistingID' % (URL, self.tenant) + data = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="NonexistingID", "tenant_id"="test_tenant"> \ + <description_bad>A NEW description...</description> \ + </group>' + #test for Content-Type = application/json + resp, content = h.request(url, "GET", body=data,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": self.auth_token, + "ACCEPT": "application/xml"}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(404, int(resp['status'])) + + +class delete_tenant_group_test(tenant_test): + +def test_delete_tenant_group_not_found(self): + #resp,content=create_tenant("test_tenant_delete", str(self.auth_token)) + resp, content = delete_tenant_group("test_tenant_delete111", \ + "test_tenant", str(self.auth_token)) + self.assertEqual(404, int(resp['status'])) + +def test_delete_tenant_group_not_found_xml(self): + #resp,content=create_tenant("test_tenant_delete", str(self.auth_token)) + resp, content = delete_tenant_group_xml("test_tenant_delete111", \ + "test_tenant", str(self.auth_token)) + self.assertEqual(404, int(resp['status'])) + +def test_delete_tenant_group(self): + resp, content = create_tenant("test_tenant_delete", \ + str(self.auth_token)) + respG, contentG = create_tenant_group('test_tenant_group_delete', \ + "test_tenant_delete", str(self.auth_token)) + respG, contentG = delete_tenant_group('test_tenant_group_delete', \ + "test_tenant_delete", str(self.auth_token)) + resp, content = delete_tenant("test_tenant_delete", \ + str(self.auth_token)) + self.assertEqual(204, int(respG['status'])) + +def test_delete_tenant_group_xml(self): + resp, content = create_tenant_xml("test_tenant_delete", \ + str(self.auth_token)) + respG, contentG = create_tenant_group_xml('test_tenant_group_delete', \ + "test_tenant_delete", str(self.auth_token)) + respG, contentG = delete_tenant_group_xml('test_tenant_group_delete', \ + "test_tenant_delete", str(self.auth_token)) + resp, content = delete_tenant_xml("test_tenant_delete", \ + str(self.auth_token)) + self.assertEqual(204, int(respG['status'])) + +class create_global_group_test(global_group_test): + +def test_global_group_create(self): + + respG, contentG = delete_global_group('test_tenant_group', \ + str(self.auth_token)) + respG, contentG = create_global_group(str(self.auth_token)) + self.group = 'test_tenant_group' + + if int(respG['status']) == 500: + self.fail('IDM fault') + elif int(respG['status']) == 503: + self.fail('Service Not Available') + if int(respG['status']) not in (200, 201): + self.fail('Failed due to %d' % int(respG['status'])) + + +def test_global_group_create_again(self): + + respG, contentG = create_global_group('test_tenant_group', \ + str(self.auth_token)) + respG, contentG = create_global_group('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + + if int(respG['status']) == 200: + self.tenant = content['tenant']['id'] + self.tenant_group = contentG['group']['id'] + if int(respG['status']) == 500: + self.fail('IDM fault') + elif int(respG['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(409, int(respG['status'])) + if int(respG['status']) == 200: + self.tenant = content['tenant']['id'] + self.tenant_group = contentG['group']['id'] + + +def test_tenant_group_create_forbidden_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + respG, contentG = create_tenant_group_xml('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + if int(respG['status']) == 200: + self.tenant_group = respG['group']['id'] + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = {"group": {"id": self.tenant_group, + "description": "A description ..." + }} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json", + "X-Auth-Token": self.token}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + + +def test_tenant_group_create_expired_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = {"group": {"id": self.tenant_group, + "description": "A description ..." + }} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json", + "X-Auth-Token": self.exp_auth_token}) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + +def test_tenant_group_create_missing_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = {"group": {"id": self.tenant_group, + "description": "A description ..."}} + resp, content = h.request(url, "POST", body=json.dumps(body), + headers={"Content-Type": "application/json"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + +def test_tenant_group_create_disabled_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '{"group": { "id": "%s", \ + "description": "A description ..." } }' % self.tenant_group + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": self.disabled_token}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + +def test_tenant_group_create_invalid_token(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '{"group": { "id": "%s", \ + "description": "A description ..." } }' % self.tenant + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/json",\ + "X-Auth-Token": 'nonexsitingtoken'}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + + +def test_tenant_group_create_xml(self): + resp, content = delete_tenant_xml('test_tenant', str(self.auth_token)) + resp, content = create_tenant_xml('test_tenant', str(self.auth_token)) + respG, contentG = delete_tenant_group_xml('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + respG, contentG = create_tenant_group_xml('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + + self.tenant = 'test_tenant' + self.tenant_group = 'test_tenant_group' + content = etree.fromstring(content) + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + + if int(respG['status']) not in (200, 201): + + self.fail('Failed due to %d' % int(respG['status'])) + +def test_tenant_group_create_again_xml(self): + + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + + respG, contentG = create_tenant_group_xml('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + respG, contentG = create_tenant_group_xml('test_tenant_group', \ + "test_tenant", str(self.auth_token)) + + content = etree.fromstring(content) + contentG = etree.fromstring(contentG) + if int(respG['status']) == 200: + self.tenant = content.get("id") + self.tenant_group = contentG.get("id") + + if int(respG['status']) == 500: + self.fail('IDM fault') + elif int(respG['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(409, int(respG['status'])) + if int(respG['status']) == 200: + self.tenant = content.get("id") + self.tenant_group = contentG.get("id") + +def test_tenant_group_create_forbidden_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant("test_tenant", str(self.auth_token)) + if int(resp['status']) == 200: + self.tenant = content['tenant']['id'] + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="%s"> \ + <description>A description...</description> \ + </group>' % self.tenant_group + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml", \ + "X-Auth-Token": self.token, + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + + self.assertEqual(403, int(resp['status'])) + +def test_tenant_group_create_expired_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get('id') + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="%s"> \ + <description>A description...</description> \ + </group>' % self.tenant + + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml", \ + "X-Auth-Token": self.exp_auth_token, + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + +def test_tenant_group_create_missing_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get('id') + + url = '%stenant/%s/groups' % (URL, self.tenant) + + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="%s"> \ + <description>A description...</description> \ + </group>' % self.tenant_group + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml", + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + +def test_tenant_group_create_disabled_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get('id') + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="%s"> \ + <description>A description...</description> \ + </group>' % self.tenant_group + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml", + "X-Auth-Token": self.disabled_token, + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(403, int(resp['status'])) + +def test_tenant_group_create_invalid_token_xml(self): + h = httplib2.Http(".cache") + resp, content = create_tenant_xml("test_tenant", str(self.auth_token)) + content = etree.fromstring(content) + if int(resp['status']) == 200: + self.tenant = content.get('id') + + url = '%stenant/%s/groups' % (URL, self.tenant) + body = '<?xml version="1.0" encoding="UTF-8"?> \ + <group xmlns="http://docs.openstack.org/idm/api/v1.0" \ + id="%s"> \ + <description>A description...</description> \ + </group>' % self.tenant_group + resp, content = h.request(url, "POST", body=body,\ + headers={"Content-Type": "application/xml",\ + "X-Auth-Token": 'nonexsitingtoken', + "ACCEPT": "application/xml"}) + + if int(resp['status']) == 500: + self.fail('IDM fault') + elif int(resp['status']) == 503: + self.fail('Service Not Available') + self.assertEqual(401, int(resp['status'])) + + +if __name__ == '__main__': + unittest.main() |
