summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRamana Juvvadi <rjuvvadi@hcl.com>2011-05-07 13:08:13 -0500
committerRamana Juvvadi <rjuvvadi@hcl.com>2011-05-07 13:08:13 -0500
commit6eacad3fb064fd2ebfc99e1efafbc108a1b91090 (patch)
tree52c81a254b5506f15433339465265fac3b3259f2
parent2e1b2f77383f3214f5cb287682e4e864b3ee44aa (diff)
downloadkeystone-6eacad3fb064fd2ebfc99e1efafbc108a1b91090.tar.gz
keystone-6eacad3fb064fd2ebfc99e1efafbc108a1b91090.tar.xz
keystone-6eacad3fb064fd2ebfc99e1efafbc108a1b91090.zip
Added some more functions through Routes and mapper
-rwxr-xr-xkeystone/auth_server.py250
-rw-r--r--keystone/db/sqlalchemy/api.py286
-rw-r--r--keystone/logic/service.py592
-rw-r--r--keystone/logic/types/fault.py43
-rw-r--r--keystone/logic/types/user.py268
-rw-r--r--test/unit/test_identity.py4185
6 files changed, 3417 insertions, 2207 deletions
diff --git a/keystone/auth_server.py b/keystone/auth_server.py
index f0cf0a7c..ec8bf6df 100755
--- a/keystone/auth_server.py
+++ b/keystone/auth_server.py
@@ -58,15 +58,27 @@ if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'keystone', '__init__.py')):
from queryext import exthandler
from keystone.common import wsgi
import keystone.logic.service as serv
+import keystone.logic.types.tenant as tenants
import keystone.logic.types.auth as auth
+import keystone.logic.types.fault as fault
+import keystone.logic.types.user as users
service = serv.IDMService()
+
def is_xml_response(req):
if not "Accept" in req.headers:
return False
return req.content_type == "application/xml"
+
+def get_auth_token(req):
+ auth_token = None
+ if "X-Auth-Token" in req.headers:
+ auth_token = req.headers["X-Auth-Token"]
+ return auth_token
+
+
def get_normalized_request_content(model, req):
"""initialize a model from json/xml contents of request body"""
@@ -78,6 +90,7 @@ def get_normalized_request_content(model, req):
raise fault.IDMFault("I don't understand the content type ", code=415)
return ret
+
def send_result(code, req, result):
content = None
resp = Response()
@@ -91,11 +104,12 @@ def send_result(code, req, result):
resp.headers['Content-Type'] = "application/json"
resp.status = code
if code > 399:
- #return bottle.abort(code, content)
- return;
+ #return bottle.abort(code, content)
+ return
return content
-class Controller(wsgi.Controller):
+
+class AuthController(wsgi.Controller):
def __init__(self, options):
self.options = options
@@ -104,12 +118,183 @@ class Controller(wsgi.Controller):
creds = get_normalized_request_content(auth.PasswordCredentials, req)
return send_result(200, req, service.authenticate(creds))
- def validate_token(self, req):
+ def validate_token(self, req, token_id):
belongs_to = None
if "belongsTo" in req.GET:
belongs_to = req.GET["belongsTo"]
- rval = service.validate_token(get_auth_token(), token_id, belongs_to)
- return send_result(200, rval)
+ rval = service.validate_token(get_auth_token(req), token_id, belongs_to)
+ return send_result(200, req, rval)
+
+ def delete_token(self, req, token_id):
+ return send_result(204, req, service.revoke_token(get_auth_token(req), token_id))
+
+
+class TenantController(wsgi.Controller):
+
+ def __init__(self, options):
+ self.options = options
+
+ def create_tenant(self, req):
+ tenant = get_normalized_request_content(tenants.Tenant, req)
+ return send_result(201, req,
+ service.create_tenant(get_auth_token(req), tenant))
+
+ def get_tenants(self, req):
+ marker = None
+ if "marker" in req.GET:
+ marker = req.GET["marker"]
+
+ if "limit" in req.GET:
+ limit = req.GET["limit"]
+ else:
+ limit = 10
+
+ url = '%s://%s:%s%s' % (req.environ['wsgi.url_scheme'],
+ req.environ.get("SERVER_NAME"),
+ req.environ.get("SERVER_PORT"),
+ req.environ['PATH_INFO'])
+
+ tenants = service.get_tenants(get_auth_token(req), marker, limit, url)
+ return send_result(200, req, tenants)
+
+ def get_tenant(self, req, tenant_id):
+ tenant = service.get_tenant(get_auth_token(req), tenant_id)
+ return send_result(200, req, tenant)
+
+ def update_tenant(self, req, tenant_id):
+ tenant = get_normalized_request_content(tenants.Tenant, req)
+ rval = service.update_tenant(get_auth_token(req), tenant_id, tenant)
+ return send_result(200, req, rval)
+
+ def delete_tenant(self, req, tenant_id):
+ rval = service.delete_tenant(get_auth_token(req), tenant_id)
+ return send_result(204, req, rval)
+
+
+
+ # Tenant Group Methods
+
+ def create_tenant_group(self, req, tenant_id):
+ group = get_normalized_request_content(tenants.Group, req)
+ return send_result(201, req,
+ service.create_tenant_group(get_auth_token(req), \
+ tenant_id, group))
+
+ def get_tenant_groups(self, req, tenant_id):
+ marker = None
+ if "marker" in req.GET:
+ marker = req.GET["marker"]
+
+ if "limit" in req.GET:
+ limit = req.GET["limit"]
+ else:
+ limit = 10
+
+ url = '%s://%s:%s%s' % (req.environ['wsgi.url_scheme'],
+ req.environ.get("SERVER_NAME"),
+ req.environ.get("SERVER_PORT"),
+ req.environ['PATH_INFO'])
+
+ groups = service.get_tenant_groups(get_auth_token(req),
+ tenant_id, marker, limit, url)
+ return send_result(200, req, groups)
+
+ def get_tenant_group(self, req, tenant_id, group_id):
+ tenant = service.get_tenant_group(get_auth_token(req), tenant_id,
+ group_id)
+ return send_result(200, req, tenant)
+
+
+ def update_tenant_group(self, req, tenant_id, group_id):
+ group = get_normalized_request_content(tenants.Group, req)
+ rval = service.update_tenant_group(get_auth_token(req),\
+ tenant_id, group_id, group)
+ return send_result(200, req, rval)
+
+ def delete_tenant_group(self, req, tenant_id, group_id):
+ rval = service.delete_tenant_group(get_auth_token(req), tenant_id,
+ group_id)
+ return send_result(204, req, rval)
+
+ def add_user_tenant_group(self, req, tenant_id, group_id, user_id):
+ # TBD
+ # IDMDevguide clarification needed on this property
+ return None
+
+ def delete_user_tenant_group(self, req, tenant_id, group_id, user_id):
+ # TBD
+ # IDMDevguide clarification needed on this property
+ return None
+ def get_user_tenant_group(self, req, tenant_id, group_id, user_id):
+ # TBD
+ # IDMDevguide clarification needed on this property
+ return None
+
+class UserController(wsgi.Controller):
+
+ def __init__(self, options):
+ self.options = options
+
+ def create_user(self, req, tenant_id):
+ user = get_normalized_request_content(users.User, req)
+ return send_result(201, req,
+ service.create_user(get_auth_token(req), tenant_id, user))
+
+ def get_tenant_users(self, req, tenant_id):
+ marker = None
+ if "marker" in req.GET:
+ marker = req.GET["marker"]
+ if "limit" in req.GET:
+ limit = req.GET["limit"]
+ else:
+ limit = 10
+ url = '%s://%s:%s%s' % (req.environ['wsgi.url_scheme'],
+ req.environ.get("SERVER_NAME"),
+ req.environ.get("SERVER_PORT"),
+ req.environ['PATH_INFO'])
+ users = service.get_tenant_users(get_auth_token(req), tenant_id, marker, limit, url)
+ return send_result(200, req, users)
+
+ def get_user_groups(self, req, tenant_id, user_id):
+ marker = None
+ if "marker" in req.GET:
+ marker = req.GET["marker"]
+
+ if "limit" in req.GET:
+ limit = req.GET["limit"]
+ else:
+ limit = 10
+
+ url = '%s://%s:%s%s' % (req.environ['wsgi.url_scheme'],\
+ req.environ.get("SERVER_NAME"),\
+ req.environ.get("SERVER_PORT"),\
+ req.environ['PATH_INFO'])
+
+ groups = service.get_user_groups(get_auth_token(),\
+ tenant_id,user_id, marker, limit,url)
+ return send_result(200, groups)
+
+ def get_user(self, req, tenant_id, user_id):
+ user = service.get_user(get_auth_token(req), tenant_id, user_id)
+ return send_result(200, req, user)
+
+ def update_user(self, req, user_id, tenant_id):
+ user = get_normalized_request_content(users.User_Update, req)
+ rval = service.update_user(get_auth_token(req), user_id, user, tenant_id)
+ return send_result(200, req, rval)
+
+ def delete_user(self, req, user_id, tenant_id):
+ rval = service.delete_user(get_auth_token(req), user_id, tenant_id)
+ return send_result(204, req, rval)
+
+ def set_user_password(self, req, user_id, tenant_id):
+ user = get_normalized_request_content(users.User_Update, req)
+ rval = service.set_user_password(get_auth_token(req), user_id, user, tenant_id)
+ return send_result(204, req, rval)
+
+ def set_user_enabled(self, req, user_id,tenant_id):
+ rval = service.enable_disable_user(get_auth_token(req), user_id, tenant_id)
+ return send_result(204, req, rval)
class Auth_API(wsgi.Router):
@@ -118,10 +303,55 @@ class Auth_API(wsgi.Router):
def __init__(self, options):
self.options = options
mapper = routes.Mapper()
- controller = Controller(options)
- mapper.connect("/v1.0/token", controller=controller, action="authenticate")
- mapper.connect("/v1.0/token/{id}", controller=controller,
- action="validate_token")
+
+ # Token Operations
+ auth_controller = AuthController(options)
+ mapper.connect("/v1.0/token", controller=auth_controller, action="authenticate")
+ mapper.connect("/v1.0/token/{token_id}", controller=auth_controller,
+ action="validate_token", conditions=dict(method=["GET"]))
+ mapper.connect("/v1.0/token/{token_id}", controller=auth_controller,
+ action="delete_token", conditions=dict(method=["DELETE"]))
+
+ # Tenant Operations
+ tenant_controller = TenantController(options)
+ mapper.connect("/v1.0/tenants", controller=tenant_controller,
+ action="create_tenant", conditions=dict(method=["POST"]))
+ mapper.connect("/v1.0/tenants", controller=tenant_controller,
+ action="get_tenants", conditions=dict(method=["GET"]))
+ mapper.connect("/v1.0/tenants/{tenant_id}", controller=tenant_controller,
+ action="get_tenant", conditions=dict(method=["GET"]))
+ mapper.connect("/v1.0/tenants/{tenant_id}", controller=tenant_controller,
+ action="update_tenant", conditions=dict(method=["PUT"]))
+ mapper.connect("/v1.0/tenants/{tenant_id}", controller=tenant_controller,
+ action="delete_tenant", conditions=dict(method=["DELETE"]))
+
+ # Tenant Group Operations
+
+ mapper.connect("/v1.0/tenants/{tenant_id}/groups", controller=tenant_controller,
+ action="create_tenant_group", conditions=dict(method=["POST"]))
+ mapper.connect("/v1.0/tenants/{tenant_id}/groups", controller=tenant_controller,
+ action="get_tenant_groups", conditions=dict(method=["GET"]))
+ mapper.connect("/v1.0/tenants/{tenant_id}/groups/{group_id}", controller=tenant_controller,
+ action="get_tenant_group", conditions=dict(method=["GET"]))
+ mapper.connect("/v1.0/tenants/{tenant_id}/groups/{group_id}", controller=tenant_controller,
+ action="update_tenant_group", conditions=dict(method=["PUT"]))
+ mapper.connect("/v1.0/tenants/{tenant_id}/groups/{group_id}", controller=tenant_controller,
+ action="delete_tenant_group", conditions=dict(method=["DELETE"]))
+
+ # User Operations
+ user_controller = UserController(options)
+ mapper.connect("/v1.0/tenants/{tenant_id}/users", controller=user_controller,
+ action="create_user", conditions=dict(method=["POST"]))
+ mapper.connect("/v1.0/tenants/{tenant_id}/users", controller=user_controller,
+ action="get_tenant_users", conditions=dict(method=["GET"]))
+ mapper.connect("/v1.0/tenants/{tenant_id}/users/{user_id}", controller=user_controller,
+ action="get_user", conditions=dict(method=["GET"]))
+ mapper.connect("/v1.0/tenants/{tenant_id}/users/{user_id}", controller=user_controller,
+ action="update_user", conditions=dict(method=["PUT"]))
+ mapper.connect("/v1.0/tenants/{tenant_id}/users/{user_id}", controller=user_controller,
+ action="delete_user", conditions=dict(method=["DELETE"]))
+
+
super(Auth_API, self).__init__(mapper)
diff --git a/keystone/db/sqlalchemy/api.py b/keystone/db/sqlalchemy/api.py
index 47768c17..751721f5 100644
--- a/keystone/db/sqlalchemy/api.py
+++ b/keystone/db/sqlalchemy/api.py
@@ -17,7 +17,7 @@
from session import get_session
-from sqlalchemy.orm import joinedload
+from sqlalchemy.orm import joinedload,aliased
import models
@@ -44,7 +44,7 @@ def tenant_get_all(session=None):
def tenant_get_page(marker,limit,session=None):
if not session:
session = get_session()
-
+
if marker:
return session.query(models.Tenant).filter("id>:marker").params(\
marker = '%s' % marker).order_by\
@@ -53,8 +53,8 @@ def tenant_get_page(marker,limit,session=None):
return session.query(models.Tenant).order_by(\
models.Tenant.id.desc()).limit(limit).all()
#return session.query(models.Tenant).all()
-
-
+
+
def tenant_get_page_markers(marker,limit,session=None):
if not session:
session = get_session()
@@ -120,7 +120,7 @@ def tenant_group_is_empty( id, session=None):
group_id=id).first()
if a_user != None:
return False
-
+
return True
def tenant_delete(id, session=None):
@@ -142,13 +142,13 @@ def tenant_group_get(id, tenant, session=None):
if not session:
session = get_session()
result = session.query(models.Group).filter_by(id=id, tenant_id=tenant).first()
-
+
return result
def tenant_group_get_page(tenantId, marker,limit,session=None):
if not session:
session = get_session()
-
+
if marker:
return session.query(models.Group).filter("id>:marker").params(\
marker = '%s' % marker).filter_by(\
@@ -158,8 +158,8 @@ def tenant_group_get_page(tenantId, marker,limit,session=None):
return session.query(models.Group).filter_by(tenant_id=tenantId)\
.order_by(models.Group.id.desc()).limit(limit).all()
#return session.query(models.Tenant).all()
-
-
+
+
def tenant_group_get_page_markers(tenantId, marker,limit,session=None):
if not session:
session = get_session()
@@ -213,6 +213,13 @@ def tenant_group_delete(id,tenant_id, session=None):
tenantgroup_ref = tenant_group_get(id,tenant_id, session)
session.delete(tenantgroup_ref)
+def user_get_by_group(user_id, group_id, session=None):
+ if not session:
+ session = get_session()
+ result = session.query(models.UserGroupAssociation).filter_by(
+ group_id=group_id, user_id=user_id).first()
+ return result
+
def user_create(values):
user_ref = models.User()
@@ -229,14 +236,6 @@ def user_get(id, session=None):
return result
-def user_get_by_tenant(tenant_id, session=None):
- if not session:
- session = get_session()
- result = session.query(models.UserTenantAssociation).filter_by(
- tenant_id=tenant_id)
- return result
-
-
def user_groups(id, session=None):
if not session:
session = get_session()
@@ -254,14 +253,6 @@ def user_update(id, values, session=None):
user_ref.save(session=session)
-def user_delete(id, session=None):
- if not session:
- session = get_session()
- with session.begin():
- user_ref = user_get(id, session)
- session.delete(user_ref)
-
-
def group_get(id, session=None):
if not session:
session = get_session()
@@ -272,43 +263,43 @@ def group_get(id, session=None):
def group_users(id, session=None):
if not session:
session = get_session()
- result = session.query(models.Users).filter_by(
+ result = session.query(models.User).filter_by(
group_id=id)
return result
def users_tenant_group_get_page(group_id, marker,limit,session=None):
if not session:
session = get_session()
-
+
if marker:
- return session.query(models.Users).filter_by(\
+ return session.query(models.User).filter_by(\
group_id=group_id).filter("id>:marker").params(\
marker = '%s' % marker).order_by\
- (models.Users.id.desc()).limit(limit).all()
+ (models.User.id.desc()).limit(limit).all()
else:
- return session.query(models.Users).filter_by(\
+ return session.query(models.User).filter_by(\
group_id=group_id).order_by(\
- models.Users.id.desc()).limit(limit).all()
-
-
-
+ models.User.id.desc()).limit(limit).all()
+
+
+
def users_tenant_group_get_page_markers(group_id, marker,limit,session=None):
if not session:
session = get_session()
- first = session.query(models.Users).order_by(\
- models.Users.id).first()
- last = session.query(models.Users).order_by(\
- models.Users.id.desc()).first()
+ first = session.query(models.User).order_by(\
+ models.User.id).first()
+ last = session.query(models.User).order_by(\
+ models.User.id.desc()).first()
if marker is None:
marker=first.id
- next=session.query(models.Users).filter_by(\
+ next=session.query(models.User).filter_by(\
group_id=group_id).filter("id > :marker").params(\
marker = '%s' % marker).order_by(\
- models.Users.id).limit(limit).all()
- prev=session.query(models.Users).filter_by(\
+ models.User.id).limit(limit).all()
+ prev=session.query(models.User).filter_by(\
group_id=group_id).filter("id < :marker").params(\
marker = '%s' % marker).order_by(\
- models.Users.id.desc()).limit(int(limit)).all()
+ models.User.id.desc()).limit(int(limit)).all()
if len(next) == 0:
next=last
else:
@@ -339,7 +330,7 @@ def group_get_all(session=None):
def group_get_page(marker,limit,session=None):
if not session:
session = get_session()
-
+
if marker:
return session.query(models.Group).filter("id>:marker").params(\
marker = '%s' % marker).order_by\
@@ -347,9 +338,9 @@ def group_get_page(marker,limit,session=None):
else:
return session.query(models.Group).order_by(\
models.Group.id.desc()).limit(limit).all()
-
-
-
+
+
+
def group_get_page_markers(marker,limit,session=None):
if not session:
session = get_session()
@@ -422,3 +413,206 @@ def token_for_user(user_id, session=None):
result = session.query(models.Token).filter_by(
user_id=user_id).order_by("expires desc").first()
return result
+
+def user_tenant_create(values):
+ user_tenant_ref = models.UserTenantAssociation()
+ user_tenant_ref.update(values)
+ user_tenant_ref.save()
+ return user_tenant_ref
+
+def user_get_update(id, session=None):
+ if not session:
+ session = get_session()
+ result = session.query(models.User).filter_by(id=id).first()
+ return result
+
+def user_get_email(email, session=None):
+ if not session:
+ session = get_session()
+ result = session.query(models.User).filter_by(email=email).first()
+ return result
+
+def users_get_by_tenant_get_page(tenant_id, marker, limit, session=None):
+ if not session:
+ session = get_session()
+ uta = aliased(models.UserTenantAssociation)
+ user = aliased(models.User)
+ if marker:
+ return session.query(user, uta).join(
+ (uta, uta.user_id == user.id)).\
+ filter(uta.tenant_id == tenant_id).\
+ filter("id>=:marker").params(
+ marker='%s' % marker).order_by(
+ user.id).limit(limit).all()
+ else:
+ return session.query(user, uta).\
+ join((uta, uta.user_id == user.id)).\
+ filter(uta.tenant_id == tenant_id).order_by(
+ user.id).limit(limit).all()
+
+def users_get_by_tenant_get_page_markers(tenant_id, marker, limit, session=None):
+ if not session:
+ session = get_session()
+ uta = aliased(models.UserTenantAssociation)
+ user = aliased(models.User)
+ first, firstassoc = session.query(user, uta).\
+ join((uta, uta.user_id == user.id)).\
+ filter(uta.tenant_id == tenant_id).\
+ order_by(user.id).first()
+ last, lastassoc = session.query(user, uta).\
+ join((uta, uta.user_id == user.id)).\
+ filter(uta.tenant_id == tenant_id).\
+ order_by(user.id.desc()).first()
+ if marker is None:
+ marker = first.id
+ next = session.query(user, uta).join((uta, uta.user_id == user.id)).\
+ filter(uta.tenant_id == tenant_id).\
+ filter("id >= :marker").params(
+ marker='%s' % marker).order_by(
+ user.id).limit(int(limit) + 1).all()
+ prev = session.query(user, uta).join((uta, uta.user_id == user.id)).\
+ filter(uta.tenant_id == tenant_id).\
+ filter("id < :marker").params(
+ marker='%s' % marker).order_by(
+ user.id.desc()).limit(int(limit)).all()
+ next_len = len(next)
+ prev_len = len(prev)
+ print next_len, prev_len
+ if next_len == 0:
+ next = last
+ else:
+ for t, a in next:
+ next = t
+ if prev_len == 0:
+ prev = first
+ else:
+ for t, a in prev:
+ prev = t
+ if first.id == marker:
+ prev = None
+ else:
+ prev = prev.id
+ if marker == last.id:
+ next = None
+ else:
+ next = next.id
+ return (prev, next)
+
+def user_groups_get_all(user_id, session=None):
+ if not session:
+ session = get_session()
+ uga = aliased(models.UserGroupAssociation)
+ group = aliased(models.Group)
+ return session.query(group, uga).\
+ join((uga, uga.group_id == group.id)).\
+ filter(uga.user_id == user_id).order_by(
+ group.id).all()
+
+def groups_get_by_user_get_page(user_id, marker, limit, session=None):
+ if not session:
+ session = get_session()
+ uga = aliased(models.UserGroupAssociation)
+ group = aliased(models.Group)
+ if marker:
+ return session.query(group, uga).join(
+ (uga, uga.group_id == group.id)).\
+ filter(uga.user_id == user_id).\
+ filter("id>=:marker").params(
+ marker='%s' % marker).order_by(
+ group.id).limit(limit).all()
+ else:
+ return session.query(group, uga).\
+ join((uga, uga.group_id == group.id)).\
+ filter(uga.user_id == user_id).order_by(
+ group.id).limit(limit).all()
+def groups_get_by_user_get_page_markers(user_id, marker, limit, session=None):
+ if not session:
+ session = get_session()
+ uga = aliased(models.UserGroupAssociation)
+ group = aliased(models.Group)
+ first, firstassoc = session.query(group, uga).\
+ join((uga, uga.group_id == group.id)).\
+ filter(uga.user_id == user_id).\
+ order_by(group.id).first()
+ last, lastassoc = session.query(group, uga).\
+ join((uga, uga.group_id == group.id)).\
+ filter(uga.user_id == user_id).\
+ order_by(group.id.desc()).first()
+ if marker is None:
+ marker = first.id
+ next = session.query(group, uga).join(
+ (uga, uga.group_id == group.id)).\
+ filter(uga.user_id == user_id).\
+ filter("id>=:marker").params(
+ marker='%s' % marker).order_by(
+ group.id).limit(int(limit)).all()
+
+
+ prev = session.query(group, uga).join(
+ (uga, uga.group_id == group.id)).\
+ filter(uga.user_id == user_id).\
+ filter("id < :marker").params(
+ marker='%s' % marker).order_by(
+ group.id).limit(int(limit) + 1).all()
+ next_len = len(next)
+ prev_len = len(prev)
+ print next_len, prev_len
+ if next_len == 0:
+ next = last
+ else:
+ for t, a in next:
+ next = t
+ if prev_len == 0:
+ prev = first
+ else:
+ for t, a in prev:
+ prev = t
+ if first.id == marker:
+ prev = None
+ else:
+ prev = prev.id
+ if marker == last.id:
+ next = None
+ else:
+ next = next.id
+ return (prev, next)
+
+
+def user_delete(id, session=None):
+ if not session:
+ session = get_session()
+ with session.begin():
+ user_ref = user_get(id, session)
+ session.delete(user_ref)
+
+def user_get_by_tenant(id, tenant_id, session=None):
+ if not session:
+ session = get_session()
+ user_tenant = session.query(models.UserTenantAssociation).filter_by(
+ tenant_id=tenant_id, user_id=id).first()
+
+ return user_tenant
+
+def user_delete_tenant(id, tenantId, session=None):
+ if not session:
+ session = get_session()
+ with session.begin():
+ user_ref = user_get_by_tenant(id, tenantId, session)
+ session.delete(user_ref)
+ user_ref = user_get(id, session)
+ session.delete(user_ref)
+
+def user_tenant_group(values):
+ user_ref = models.UserGroupAssociation()
+ user_ref.update(values)
+ user_ref.save()
+ return user_ref
+
+
+def user_tenant_group_delete(id, group_id, session=None):
+ if not session:
+ session = get_session()
+ with session.begin():
+ usertenantgroup_ref = user_get_by_group(id, group_id, session)
+ session.delete(usertenantgroup_ref)
+
diff --git a/keystone/logic/service.py b/keystone/logic/service.py
index ef76867b..cc852af0 100644
--- a/keystone/logic/service.py
+++ b/keystone/logic/service.py
@@ -20,6 +20,7 @@ import keystone.logic.types.auth as auth
import keystone.logic.types.tenant as tenants
import keystone.logic.types.atom as atom
import keystone.logic.types.fault as fault
+import keystone.logic.types.user as users
import keystone.db.sqlalchemy.api as db_api
import keystone.db.sqlalchemy.models as db_models
@@ -124,12 +125,12 @@ class IDMService(object):
# dtenant.desc, dtenant.enabled))
# return tenants.Tenants(ts, [])
-
-
+
+
##
## GET Tenants with Pagination
##
-
+
def get_tenants(self, admin_token, marker, limit, url):
self.__validate_token(admin_token)
@@ -142,13 +143,13 @@ class IDMService(object):
links=[]
if prev:
links.append(atom.Link('prev',"%s?'marker=%s&limit=%s'" % (url,prev,limit)))
- if next:
+ if next:
links.append(atom.Link('next',"%s?'marker=%s&limit=%s'" % (url,next,limit)))
-
-
+
+
return tenants.Tenants(ts, links)
-
+
def get_tenant(self, admin_token, tenant_id):
self.__validate_token(admin_token)
@@ -188,11 +189,11 @@ class IDMService(object):
db_api.tenant_delete(dtenant.id)
return None
-
+
#
# Tenant Group Operations
#
-
+
def create_tenant_group(self, admin_token, tenant, group):
self.__validate_token(admin_token)
@@ -201,7 +202,7 @@ class IDMService(object):
if tenant == None:
raise fault.BadRequestFault("Expecting a Tenant Id")
-
+
dtenant = db_api.tenant_get(tenant)
if dtenant == None:
raise fault.ItemNotFoundFault("The tenant not found")
@@ -209,7 +210,7 @@ class IDMService(object):
if group.group_id == None:
raise fault.BadRequestFault("Expecting a Group Id")
-
+
if db_api.group_get(group.group_id) != None:
raise fault.TenantGroupConflictFault(
"A tenant group with that id already exists")
@@ -223,20 +224,20 @@ class IDMService(object):
return tenants.Group(dtenant.id, dtenant.desc, dtenant.tenant_id)
-
-
+
+
def get_tenant_groups(self, admin_token, tenantId, marker, limit, url):
self.__validate_token(admin_token)
if tenantId == None:
raise fault.BadRequestFault("Expecting a Tenant Id")
-
+
dtenant = db_api.tenant_get(tenantId)
if dtenant == None:
raise fault.ItemNotFoundFault("The tenant not found")
-
+
ts = []
dtenantgroups = db_api.tenant_group_get_page(tenantId, marker,limit)
-
+
for dtenantgroup in dtenantgroups:
ts.append(tenants.Group(dtenantgroup.id,
dtenantgroup.desc, dtenantgroup.tenant_id))
@@ -244,27 +245,27 @@ class IDMService(object):
links=[]
if prev:
links.append(atom.Link('prev',"%s?'marker=%s&limit=%s'" % (url,prev,limit)))
- if next:
+ if next:
links.append(atom.Link('next',"%s?'marker=%s&limit=%s'" % (url,next,limit)))
-
-
+
+
return tenants.Groups(ts, links)
-
+
def get_tenant_group(self, admin_token, tenant_id, group_id):
self.__validate_token(admin_token)
-
+
dtenant = db_api.tenant_get(tenant_id)
if dtenant == None:
raise fault.ItemNotFoundFault("The tenant not found")
-
+
dtenant = db_api.tenant_group_get(group_id, tenant_id)
if not dtenant:
raise fault.ItemNotFoundFault("The tenant group not found")
-
-
+
+
return tenants.Group(dtenant.id, dtenant.desc, dtenant.tenant_id)
-
-
+
+
def update_tenant_group(self, admin_token, tenant_id, group_id, group):
self.__validate_token(admin_token)
@@ -275,32 +276,32 @@ class IDMService(object):
dtenant = db_api.tenant_get(tenant_id)
if dtenant == None:
raise fault.ItemNotFoundFault("The tenant not found")
-
+
dtenant = db_api.tenant_group_get(group_id, tenant_id)
if not dtenant:
raise fault.ItemNotFoundFault("The tenant group not found")
-
+
if group_id != group.group_id:
raise fault.BadRequestFault("Wrong Data Provided,Group id not matching")
-
+
if str(tenant_id) != str(group.tenant_id):
- raise fault.BadRequestFault("Wrong Data Provided, Tenant id not matching ")
-
+ raise fault.BadRequestFault("Wrong Data Provided, Tenant id not matching ")
+
values = {'desc': group.description}
db_api.tenant_group_update(group_id, tenant_id, values)
return tenants.Group(group_id, group.description, tenant_id)
-
+
def delete_tenant_group(self, admin_token, tenant_id, group_id):
self.__validate_token(admin_token)
dtenant = db_api.tenant_get(tenant_id)
-
+
if dtenant == None:
raise fault.ItemNotFoundFault("The tenant not found")
-
+
dtenant = db_api.tenant_group_get(group_id, tenant_id)
if not dtenant:
raise fault.ItemNotFoundFault("The tenant group not found")
@@ -311,9 +312,10 @@ class IDMService(object):
db_api.tenant_group_delete(group_id, tenant_id)
return None
-
-
- def get_users_tenant_group(self, admin_token, tenantId, groupId, marker, limit, url):
+
+
+ def get_users_tenant_group(self, admin_token, tenantId, groupId, marker,
+ limit, url):
self.__validate_token(admin_token)
if tenantId == None:
raise fault.BadRequestFault("Expecting a Tenant Id")
@@ -324,24 +326,78 @@ class IDMService(object):
if db_api.tenant_group_get(groupId, tenantId) == None:
raise fault.ItemNotFoundFault(
"A tenant group with that id not found")
-
ts = []
-
- dgroupusers = db_api.users_tenant_group_get_page( groupId, marker,limit)
- for dgroupuser in dgroupusers:
+ dgroupusers = db_api.users_tenant_group_get_page(groupId, marker,
+ limit)
+ for dgroupuser, dgroupuserAsso in dgroupusers:
+
ts.append(tenants.User(dgroupuser.id,
- dtenantgroup.email, tenantId, dtenantgroup.enabled))
- prev,next=db_api.users_tenant_group_get_page_markers( groupId, marker, limit)
- links=[]
- if prev:
- links.append(atom.Link('prev',"%s?'marker=%s&limit=%s'" % (url,prev,limit)))
- if next:
- links.append(atom.Link('next',"%s?'marker=%s&limit=%s'" % (url,next,limit)))
-
-
+ dgroupuser.email, dgroupuser.enabled,
+ tenantId, None))
+ links = []
+ if ts.__len__():
+ prev, next = db_api.users_tenant_group_get_page_markers(groupId,
+ marker, limit)
+ if prev:
+ links.append(atom.Link('prev', "%s?'marker=%s&limit=%s'" %
+ (url, prev, limit)))
+ if next:
+ links.append(atom.Link('next', "%s?'marker=%s&limit=%s'" %
+ (url, next, limit)))
return tenants.Users(ts, links)
+ def add_user_tenant_group(self, admin_token, tenant, group, user):
+ self.__validate_token(admin_token)
+
+ if db_api.tenant_get(tenant) == None:
+ raise fault.ItemNotFoundFault("The Tenant not found")
+
+ if db_api.group_get(group) == None:
+ raise fault.ItemNotFoundFault("The Group not found")
+ duser = db_api.user_get(user)
+ if duser == None:
+ raise fault.ItemNotFoundFault("The User not found")
+
+ if db_api.tenant_group_get(group, tenant) == None:
+ raise fault.ItemNotFoundFault("A tenant group with"
+ " that id not found")
+
+ if db_api.user_get_by_group(user, group) != None:
+ raise fault.UserGroupConflictFault(
+ "A user with that id already exists in group")
+
+ dusergroup = db_models.UserGroupAssociation()
+ dusergroup.user_id = user
+ dusergroup.group_id = group
+ db_api.user_tenant_group(dusergroup)
+
+ return tenants.User(duser.id, duser.email, duser.enabled,
+ tenant, group)
+ def delete_user_tenant_group(self, admin_token, tenant, group, user):
+ self.__validate_token(admin_token)
+
+ if db_api.tenant_get(tenant) == None:
+ raise fault.ItemNotFoundFault("The Tenant not found")
+
+ if db_api.group_get(group) == None:
+ raise fault.ItemNotFoundFault("The Group not found")
+ duser = db_api.user_get(user)
+ if duser == None:
+ raise fault.ItemNotFoundFault("The User not found")
+
+ if db_api.tenant_group_get(group, tenant) == None:
+ raise fault.ItemNotFoundFault("A tenant group with"
+ " that id not found")
+
+ if db_api.user_get_by_group(user, group) == None:
+ raise fault.ItemNotFoundFault("A user with that id "
+ "in a group not found")
+
+ db_api.user_tenant_group_delete(user, group)
+ return None
+
+
#
# Private Operations
#
@@ -356,6 +412,442 @@ class IDMService(object):
user = db_api.user_get(token.user_id)
return (token, user)
+ #
+ # User Operations
+ #
+ def create_user(self, admin_token, tenant_id, user):
+ self.__validate_token(admin_token)
+
+ dtenant = db_api.tenant_get(tenant_id)
+ if dtenant == None:
+ raise fault.UnauthorizedFault("Unauthorized")
+ if not dtenant.enabled:
+ raise fault.TenantDisabledFault("Your account has been disabled")
+
+ if not isinstance(user, users.User):
+ raise fault.BadRequestFault("Expecting a User")
+
+ if user.user_id == None:
+ raise fault.BadRequestFault("Expecting a unique User Id")
+
+ if db_api.user_get(user.user_id) != None:
+ raise fault.UserConflictFault(
+ "An user with that id already exists")
+
+ if db_api.user_get_email(user.email) != None:
+ raise fault.EmailConflictFault(
+ "Email already exists")
+
+
+ duser_tenant=db_models.UserTenantAssociation()
+ duser_tenant.user_id=user.user_id
+ duser_tenant.tenant_id=tenant_id
+ db_api.user_tenant_create(duser_tenant)
+
+ duser = db_models.User()
+ duser.id = user.user_id
+ duser.password = user.password
+ duser.email = user.email
+ duser.enabled = user.enabled
+ db_api.user_create(duser)
+
+ return user
+
+ def get_tenant_users(self, admin_token, tenant_id, marker, limit,url):
+ self.__validate_token(admin_token)
+
+ if tenant_id == None:
+ raise fault.BadRequestFault("Expecting a Tenant Id")
+
+ if db_api.tenant_get(tenant_id) == None:
+ raise fault.ItemNotFoundFault("The tenant not found")
+ ts = []
+ dtenantusers = db_api.users_get_by_tenant_get_page(tenant_id, marker,
+ limit)
+ for dtenantuser, dtenantuserAsso in dtenantusers:
+ ts.append(users.User(None,dtenantuser.id,tenant_id,
+ dtenantuser.email, dtenantuser.enabled))
+ links = []
+ if ts.__len__():
+ prev, next =db_api.users_get_by_tenant_get_page_markers(tenant_id,
+ marker, limit)
+ if prev:
+ links.append(atom.Link('prev', "%s?'marker=%s&limit=%s'" %
+ (url, prev, limit)))
+ if next:
+ links.append(atom.Link('next', "%s?'marker=%s&limit=%s'" %
+ (url, next, limit)))
+ return users.Users(ts, links)
+
+ def get_user(self, admin_token, tenant_id, user_id):
+ self.__validate_token(admin_token)
+ dtenant = db_api.tenant_get(tenant_id)
+ if dtenant == None:
+ raise fault.UnauthorizedFault("Unauthorized")
+ if not dtenant.enabled:
+ raise fault.TenantDisabledFault("Your account has been disabled")
+
+ duser = db_api.user_get(user_id)
+ if not duser:
+ raise fault.ItemNotFoundFault("The user could not be found")
+
+ if not duser.enabled:
+ raise fault.UserDisabledFault("User has been disabled")
+
+ if len(duser.tenants) > 0:
+ tenant_user = duser.tenants[0].tenant_id
+ else:
+ tenant_user = tenant_id
+
+ ts = []
+ dusergroups = db_api.user_groups_get_all(user_id)
+
+ for dusergroup, dusergroupAsso in dusergroups:
+
+
+ ts.append(tenants.Group(dusergroup.id,dusergroup.tenant_id,None))
+
+ return users.User_Update(None,duser.id, tenant_user, duser.email, \
+ duser.enabled,ts )
+
+ def update_user(self, admin_token, user_id, user,tenant_id):
+ self.__validate_token(admin_token)
+
+ dtenant = db_api.tenant_get(tenant_id)
+ if dtenant == None:
+ raise fault.UnauthorizedFault("Unauthorized")
+ if not dtenant.enabled:
+ raise fault.TenantDisabledFault("Your account has been disabled")
+
+ duser = db_api.user_get(user_id)
+ if not duser:
+ raise fault.ItemNotFoundFault("The user could not be found")
+
+ if not duser.enabled:
+ raise fault.UserDisabledFault("User has been disabled")
+
+
+ if not isinstance(user, users.User):
+ raise fault.BadRequestFault("Expecting a User")
+ True
+ duser = db_api.user_get_update(user_id)
+ if duser == None:
+ raise fault.ItemNotFoundFault("The user could not be found")
+ if db_api.user_get_email(user.email) != None:
+ raise fault.EmailConflictFault(
+ "Email already exists")
+
+ values = {'email': user.email}
+
+ db_api.user_update(user_id, values)
+ duser = db_api.user_get_update(user_id)
+ return users.User(duser.password, duser.id, tenant_id, duser.email, \
+ duser.enabled)
+
+ def set_user_password(self, admin_token, user_id, user,tenant_id):
+ self.__validate_token(admin_token)
+
+ dtenant = db_api.tenant_get(tenant_id)
+ if dtenant == None:
+ raise fault.UnauthorizedFault("Unauthorized")
+ if not dtenant.enabled:
+ raise fault.TenantDisabledFault("Your account has been disabled")
+
+ duser = db_api.user_get(user_id)
+ if not duser:
+ raise fault.ItemNotFoundFault("The user could not be found")
+
+ if not duser.enabled:
+ raise fault.UserDisabledFault("User has been disabled")
+
+
+ if not isinstance(user, users.User):
+ raise fault.BadRequestFault("Expecting a User")
+ True
+ duser = db_api.user_get(user_id)
+ if duser == None:
+ raise fault.ItemNotFoundFault("The user could not be found")
+
+ values = {'password': user.password}
+
+ db_api.user_update(user_id, values)
+
+ return users.User(user.password, '', '', '', '')
+
+ def enable_disable_user(self, admin_token, user_id, user,tenant_id):
+ self.__validate_token(admin_token)
+
+ dtenant = db_api.tenant_get(tenant_id)
+ if dtenant == None:
+ raise fault.UnauthorizedFault("Unauthorized")
+ if not dtenant.enabled:
+ raise fault.TenantDisabledFault("Your account has been disabled")
+
+ duser = db_api.user_get(user_id)
+ if not duser:
+ raise fault.ItemNotFoundFault("The user could not be found")
+
+ if not duser.enabled:
+ raise fault.UserDisabledFault("User has been disabled")
+
+
+ if not isinstance(user, users.User):
+ raise fault.BadRequestFault("Expecting a User")
+ True
+ duser = db_api.user_get(user_id)
+ if duser == None:
+ raise fault.ItemNotFoundFault("The user could not be found")
+
+ values = {'enabled': user.enabled}
+
+ db_api.user_update(user_id, values)
+
+ return users.User('','','','',user.enabled)
+
+ def delete_user(self, admin_token, user_id, tenant_id):
+ self.__validate_token(admin_token)
+ dtenant = db_api.tenant_get(tenant_id)
+ if dtenant == None:
+ raise fault.UnauthorizedFault("Unauthorized")
+ if not dtenant.enabled:
+ raise fault.TenantDisabledFault("Your account has been disabled")
+
+ duser = db_api.user_get(user_id)
+ if not duser:
+ raise fault.ItemNotFoundFault("The user could not be found")
+ duser = db_api.user_get_by_tenant(user_id, tenant_id)
+ if not duser:
+ raise fault.ItemNotFoundFault("The user could not be "
+ "found under given tenant")
+
+ db_api.user_delete_tenant(user_id, tenant_id)
+ return None
+
+ def get_user_groups(self, admin_token, tenant_id,user_id, marker, limit, url):
+ self.__validate_token(admin_token)
+
+ if tenant_id == None:
+ raise fault.BadRequestFault("Expecting a Tenant Id")
+
+ if db_api.tenant_get(tenant_id) == None:
+ raise fault.ItemNotFoundFault("The tenant not found")
+
+ ts = []
+ dusergroups = db_api.groups_get_by_user_get_page(user_id, marker,
+ limit)
+ print dusergroups
+ for dusergroup, dusergroupAsso in dusergroups:
+
+
+ ts.append(tenants.Group(dusergroup.id,dusergroup.desc,dusergroup.tenant_id))
+ links = []
+ if ts.__len__():
+ prev, next =db_api.groups_get_by_user_get_page_markers(user_id, marker,
+ limit)
+ if prev:
+ links.append(atom.Link('prev', "%s?'marker=%s&limit=%s'" %
+ (url, prev, limit)))
+ if next:
+ links.append(atom.Link('next', "%s?'marker=%s&limit=%s'" %
+ (url, next, limit)))
+ return tenants.Groups(ts, links)
+
+ #
+
+ #
+ # Global Group Operations
+ #
+
+ def __check_create_global_tenant(self):
+
+ dtenant = db_api.tenant_get('GlobalTenant')
+
+ if dtenant is None:
+ dtenant = db_models.Tenant()
+ dtenant.id = 'GlobalTenant'
+ dtenant.desc = 'GlobalTenant is Default tenant for global groups'
+ dtenant.enabled = True
+ db_api.tenant_create(dtenant)
+ return dtenant
+
+ def create_global_group(self, admin_token, group):
+ self.__validate_token(admin_token)
+
+ if not isinstance(group, tenants.Group):
+ raise fault.BadRequestFault("Expecting a Group")
+
+ if group.group_id == None:
+ raise fault.BadRequestFault("Expecting a Group Id")
+
+ if db_api.group_get(group.group_id) != None:
+ raise fault.TenantGroupConflictFault(
+ "A tenant group with that id already exists")
+ gtenant = self.__check_create_global_tenant()
+ dtenant = db_models.Group()
+ dtenant.id = group.group_id
+ dtenant.desc = group.description
+ dtenant.tenant_id = gtenant.id
+ db_api.tenant_group_create(dtenant)
+ return tenants.Group(dtenant.id, dtenant.desc, None)
+
+ def get_global_groups(self, admin_token, marker, limit, url):
+ self.__validate_token(admin_token)
+ gtenant = self.__check_create_global_tenant()
+ ts = []
+ dtenantgroups = db_api.tenant_group_get_page(gtenant.id, \
+ marker, limit)
+
+ for dtenantgroup in dtenantgroups:
+ ts.append(tenants.Group(dtenantgroup.id,
+ dtenantgroup.desc))
+ prev, next = db_api.tenant_group_get_page_markers(gtenant.id,
+ marker, limit)
+ links = []
+ if prev:
+ links.append(atom.Link('prev', "%s?'marker=%s&limit=%s'" %
+ (url, prev, limit)))
+ if next:
+ links.append(atom.Link('next', "%s?'marker=%s&limit=%s'" %
+ (url, next, limit)))
+ return tenants.Groups(ts, links)
+
+ def get_global_group(self, admin_token, group_id):
+ self.__validate_token(admin_token)
+ gtenant = self.__check_create_global_tenant()
+ dtenant = db_api.tenant_get(gtenant.id)
+ if dtenant == None:
+ raise fault.ItemNotFoundFault("The Global tenant not found")
+
+ dtenant = db_api.tenant_group_get(group_id, gtenant.id)
+ if not dtenant:
+ raise fault.ItemNotFoundFault("The Global tenant group not found")
+ return tenants.Group(dtenant.id, dtenant.desc)
+
+ def update_global_group(self, admin_token, group_id, group):
+ self.__validate_token(admin_token)
+ gtenant = self.__check_create_global_tenant()
+ if not isinstance(group, tenants.Group):
+ raise fault.BadRequestFault("Expecting a Group")
+
+ dtenant = db_api.tenant_get(gtenant.id)
+ if dtenant == None:
+ raise fault.ItemNotFoundFault("The global tenant not found")
+
+ dtenant = db_api.tenant_group_get(group_id, gtenant.id)
+ if not dtenant:
+ raise fault.ItemNotFoundFault("The Global tenant group not found")
+ if group_id != group.group_id:
+ raise fault.BadRequestFault("Wrong Data Provided,"
+ "Group id not matching")
+
+ values = {'desc': group.description}
+ db_api.tenant_group_update(group_id, gtenant.id, values)
+ return tenants.Group(group_id, group.description, gtenant.id)
+
+ def delete_global_group(self, admin_token, group_id):
+ self.__validate_token(admin_token)
+ gtenant = self.__check_create_global_tenant()
+ dtenant = db_api.tenant_get(gtenant.id)
+
+ if dtenant == None:
+ raise fault.ItemNotFoundFault("The global tenant not found")
+
+ dtenant = db_api.tenant_group_get(group_id, gtenant.id)
+ if not dtenant:
+ raise fault.ItemNotFoundFault("The global tenant group not found")
+
+ if not db_api.tenant_group_is_empty(group_id):
+ raise fault.ForbiddenFault("You may not delete a group that "
+ "contains users")
+
+ db_api.tenant_group_delete(group_id, gtenant.id)
+ return None
+
+ def get_users_global_group(self, admin_token, groupId, marker, limit, url):
+ self.__validate_token(admin_token)
+ gtenant = self.__check_create_global_tenant()
+ if gtenant.id == None:
+ raise fault.BadRequestFault("Expecting a global Tenant")
+
+ if db_api.tenant_get(gtenant.id) == None:
+ raise fault.ItemNotFoundFault("The global tenant not found")
+
+ if db_api.tenant_group_get(groupId, gtenant.id) == None:
+ raise fault.ItemNotFoundFault(
+ "A global tenant group with that id not found")
+ ts = []
+ dgroupusers = db_api.users_tenant_group_get_page(groupId, marker,
+ limit)
+ for dgroupuser, dgroupuserassoc in dgroupusers:
+ ts.append(tenants.User(dgroupuser.id, dgroupuser.email,
+ dgroupuser.enabled))
+ links = []
+ if ts.__len__():
+ prev, next = db_api.users_tenant_group_get_page_markers(groupId,
+ marker, limit)
+ if prev:
+ links.append(atom.Link('prev', "%s?'marker=%s&limit=%s'"
+ % (url, prev, limit)))
+ if next:
+ links.append(atom.Link('next', "%s?'marker=%s&limit=%s'"
+ % (url, next, limit)))
+ return tenants.Users(ts, links)
+
+ def add_user_global_group(self, admin_token, group, user):
+ self.__validate_token(admin_token)
+ gtenant = self.__check_create_global_tenant()
+
+ if db_api.tenant_get(gtenant.id) == None:
+ raise fault.ItemNotFoundFault("The Global Tenant not found")
+
+ if db_api.group_get(group) == None:
+ raise fault.ItemNotFoundFault("The Group not found")
+ duser = db_api.user_get(user)
+ if duser == None:
+ raise fault.ItemNotFoundFault("The User not found")
+
+ if db_api.tenant_group_get(group, gtenant.id) == None:
+ raise fault.ItemNotFoundFault("A global tenant group with"
+ " that id not found")
+
+ if db_api.user_get_by_group(user, group) != None:
+ raise fault.UserGroupConflictFault(
+ "A user with that id already exists in group")
+
+ dusergroup = db_models.UserGroupAssociation()
+ dusergroup.user_id = user
+ dusergroup.group_id = group
+ db_api.user_tenant_group(dusergroup)
+
+ return tenants.User(duser.id, duser.email, duser.enabled,
+ group_id = group)
+
+ def delete_user_global_group(self, admin_token, group, user):
+ self.__validate_token(admin_token)
+ gtenant = self.__check_create_global_tenant()
+
+ if db_api.tenant_get(gtenant.id) == None:
+ raise fault.ItemNotFoundFault("The Global Tenant not found")
+
+ if db_api.group_get(group) == None:
+ raise fault.ItemNotFoundFault("The Group not found")
+ duser = db_api.user_get(user)
+ if duser == None:
+ raise fault.ItemNotFoundFault("The User not found")
+
+ if db_api.tenant_group_get(group, gtenant.id) == None:
+ raise fault.ItemNotFoundFault("A global tenant group with "
+ "that id not found")
+
+ if db_api.user_get_by_group(user, group) == None:
+ raise fault.ItemNotFoundFault("A user with that id in a "
+ "group not found")
+
+ db_api.user_tenant_group_delete(user, group)
+ return None
+
+ #
+
def __get_auth_data(self, dtoken, duser):
"""return AuthData object for a token/user pair"""
diff --git a/keystone/logic/types/fault.py b/keystone/logic/types/fault.py
index fd2e85cc..db8ef90d 100644
--- a/keystone/logic/types/fault.py
+++ b/keystone/logic/types/fault.py
@@ -79,14 +79,6 @@ class UnauthorizedFault(IDMFault):
self.key = "unauthorized"
-class UserDisabledFault(IDMFault):
- "The user is disabled"
-
- def __init__(self, msg, details=None, code=403):
- super(UserDisabledFault, self).__init__(msg, details, code)
- self.key = "userDisabled"
-
-
class ForbiddenFault(IDMFault):
"The user is forbidden"
@@ -94,6 +86,13 @@ class ForbiddenFault(IDMFault):
super(ForbiddenFault, self).__init__(msg, details, code)
self.key = "forbidden"
+class TenantDisabledFault(IDMFault):
+ "The tenant is disabled"
+
+ def __init__(self, msg, details=None, code=403):
+ super(TenantDisabledFault, self).__init__(msg, details, code)
+ self.key = "tenantDisabled"
+
class ItemNotFoundFault(IDMFault):
"The item is not found"
@@ -126,3 +125,31 @@ class OverlimitFault(IDMFault):
self.args = (code, msg, details, retry_at)
self.retry_at = retry_at
self.key = "overLimit"
+
+class UserConflictFault(IDMFault):
+ "The User already exists?"
+
+ def __init__(self, msg, details=None, code=409):
+ super(UserConflictFault, self).__init__(msg, details, code)
+ self.key = "userConflict"
+
+class UserDisabledFault(IDMFault):
+ "The user is disabled"
+
+ def __init__(self, msg, details=None, code=403):
+ super(UserDisabledFault, self).__init__(msg, details, code)
+ self.key = "userDisabled"
+
+class EmailConflictFault(IDMFault):
+ "The Email already exists?"
+
+ def __init__(self, msg, details=None, code=409):
+ super(EmailConflictFault, self).__init__(msg, details, code)
+ self.key = "emailConflict"
+
+class UserGroupConflictFault(IDMFault):
+ "The user already exists in group?"
+
+ def __init__(self, msg, details=None, code=409):
+ super(UserGroupConflictFault, self).__init__(msg, details, code)
+ self.key = "userGroupConflict"
diff --git a/keystone/logic/types/user.py b/keystone/logic/types/user.py
new file mode 100644
index 00000000..a2496806
--- /dev/null
+++ b/keystone/logic/types/user.py
@@ -0,0 +1,268 @@
+# Copyright (c) 2010-2011 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import keystone.logic.types.fault as fault
+from lxml import etree
+import string
+
+
+class User(object):
+ "A user."
+
+ def __init__(self, password,user_id, tenant_id, email,enabled):
+ self.user_id = user_id
+ self.tenant_id = tenant_id
+ self.password = password
+ self.email = email
+ self.enabled = enabled and True or False
+
+ @staticmethod
+ def from_xml(xml_str):
+ try:
+ dom = etree.Element("root")
+ dom.append(etree.fromstring(xml_str))
+ root = dom.find("{http://docs.openstack.org/idm/api/v1.0}user")
+ if root == None:
+ raise fault.BadRequestFault("Expecting User")
+ user_id=root.get("id")
+ tenant_id = root.get("tenantId")
+ email = root.get("email")
+ password = root.get("password")
+ enabled = root.get("enabled")
+ if user_id == None:
+ raise fault.BadRequestFault("Expecting User")
+ elif tenant_id == None:
+ raise fault.BadRequestFault("Expecting User tenant")
+ elif password == None:
+ raise fault.BadRequestFault("Expecting User password")
+ elif email == None:
+ raise fault.BadRequestFault("Expecting User email")
+ if enabled == None or enabled == "true" or enabled == "yes":
+ set_enabled = True
+ elif enabled == "false" or enabled == "no":
+ set_enabled = False
+ else:
+ raise fault.BadRequestFault("Bad enabled attribute!")
+ if password == '':
+ password=user_id
+ return User(password,user_id,tenant_id,email,set_enabled)
+ except etree.LxmlError as e:
+ raise fault.BadRequestFault("Cannot parse User", str(e))
+
+ @staticmethod
+ def from_json(json_str):
+ try:
+ obj = json.loads(json_str)
+ print obj
+ if not "user" in obj:
+ raise fault.BadRequestFault("Expecting User")
+ user = obj["user"]
+ if not "id" in user:
+ user_id = None
+ else:
+ user_id = user["id"]
+ if not "password" in user:
+ raise fault.BadRequestFault("Expecting User Password")
+ password = user["password"]
+ if not "tenantId" in user:
+ raise fault.BadRequestFault("Expecting User Tenant")
+ tenant_id = user["tenantId"]
+ if not "email" in user:
+ raise fault.BadRequestFault("Expecting User Email")
+ email = user["email"]
+ if "enabled" in user:
+ set_enabled = user["enabled"]
+ if not isinstance(set_enabled, bool):
+ raise fault.BadRequestFault("Bad enabled attribute!")
+ else:
+ set_enabled=True
+ if password == '':
+ password=user_id
+ return User(password,user_id,tenant_id,email,set_enabled)
+ except (ValueError, TypeError) as e:
+ raise fault.BadRequestFault("Cannot parse Tenant", str(e))
+
+ def to_dom(self):
+ dom = etree.Element("user",
+ xmlns="http://docs.openstack.org/idm/api/v1.0")
+ if self.email:
+ dom.set("email", self.email)
+ if self.tenant_id:
+ dom.set("tenantId",self.tenant_id)
+ if self.user_id:
+ dom.set("id",self.user_id)
+ if self.enabled:
+ dom.set("enabled",string.lower(str(self.enabled)))
+ if self.password:
+ dom.set("password",self.password)
+
+
+ return dom
+
+ def to_xml(self):
+ print '34'
+ return etree.tostring(self.to_dom())
+
+ def to_dict(self):
+ user = {}
+
+ if self.user_id:
+ user["id"] = self.user_id
+ user["tenantId"]=self.tenant_id
+ if self.password:
+ user["password"]=self.password
+ user["email"]=self.email
+ user["enabled"]=self.enabled
+ return {'user': user}
+
+ def to_json(self):
+ return json.dumps(self.to_dict())
+
+class User_Update(object):
+ "A user."
+
+ def __init__(self, password,user_id, tenant_id, email,enabled, group=None):
+ self.user_id = user_id
+ self.tenant_id = tenant_id
+ self.password = password
+ self.email = email
+ self.enabled = enabled and True or False
+ if group is not None:
+ self.group=group
+ @staticmethod
+ def from_xml(xml_str):
+ try:
+ dom = etree.Element("root")
+ dom.append(etree.fromstring(xml_str))
+ root = dom.find("{http://docs.openstack.org/idm/api/v1.0}user")
+ if root == None:
+ raise fault.BadRequestFault("Expecting User")
+ user_id=root.get("id")
+ tenant_id = root.get("tenantId")
+ email = root.get("email")
+ password = root.get("password")
+ enabled = root.get("enabled")
+ if enabled == None or enabled == "true" or enabled == "yes":
+ set_enabled = True
+ elif enabled == "false" or enabled == "no":
+ set_enabled = False
+ else:
+ raise fault.BadRequestFault("Bad enabled attribute!")
+ if password == '':
+ password=user_id
+ return User(password,user_id,tenant_id,email,set_enabled)
+ except etree.LxmlError as e:
+ raise fault.BadRequestFault("Cannot parse User", str(e))
+
+ @staticmethod
+ def from_json(json_str):
+ try:
+ obj = json.loads(json_str)
+ print obj
+ if not "user" in obj:
+ raise fault.BadRequestFault("Expecting User")
+ user = obj["user"]
+ if not "id" in user:
+ user_id = None
+ else:
+ user_id = user["id"]
+ if not "password" in user:
+ password = None
+ else:
+ password=user["password"]
+ if not "tenantId" in user:
+ tenant_id=None
+ else:
+ tenant_id = user["tenantId"]
+ if not "email" in user:
+ email=None
+ else:
+ email = user["email"]
+ if "enabled" in user:
+ set_enabled = user["enabled"]
+ if not isinstance(set_enabled, bool):
+ raise fault.BadRequestFault("Bad enabled attribute!")
+ else:
+ set_enabled=True
+ if password == '':
+ password=user_id
+ return User(password,user_id,tenant_id,email,set_enabled)
+ except (ValueError, TypeError) as e:
+ raise fault.BadRequestFault("Cannot parse Tenant", str(e))
+
+ def to_dom(self):
+ dom = etree.Element("user",
+ xmlns="http://docs.openstack.org/idm/api/v1.0")
+ if self.email:
+ dom.set("email", self.email)
+ if self.tenant_id:
+ dom.set("tenantId",self.tenant_id)
+ if self.user_id:
+ dom.set("id",self.user_id)
+ if self.enabled:
+ dom.set("enabled",string.lower(str(self.enabled)))
+ if self.password:
+ dom.set("password",self.password)
+ if self.group:
+ print '78'
+ for group in self.group:
+ dom.append(group.to_dom())
+ return dom
+
+ def to_xml(self):
+ return etree.tostring(self.to_dom())
+
+ def to_dict(self):
+ user = {}
+
+ if self.user_id:
+ user["id"] = self.user_id
+ if self.user_id:
+ user["tenantId"]=self.tenant_id
+ if self.password:
+ user["password"]=self.password
+ if self.email:
+ user["email"]=self.email
+ if self.enabled:
+ user["enabled"]=self.enabled
+ if self.group:
+ values=[t.to_dict()["group"] for t in self.group]
+ user["groups"] = {"values": values}
+ return {'user': user}
+
+ def to_json(self):
+ return json.dumps(self.to_dict())
+
+class Users(object):
+ "A collection of users."
+
+ def __init__(self, values, links):
+ self.values = values
+ self.links = links
+
+ def to_xml(self):
+ dom = etree.Element("users")
+ dom.set(u"xmlns", "http://docs.openstack.org/idm/api/v1.0")
+ for t in self.values:
+ dom.append(t.to_dom())
+ for t in self.links:
+ dom.append(t.to_dom())
+ return etree.tostring(dom)
+
+ def to_json(self):
+ values = [t.to_dict()["user"] for t in self.values]
+ links = [t.to_dict()["links"] for t in self.links]
+ return json.dumps({"users": {"values": values, "links": links}}) \ No newline at end of file
diff --git a/test/unit/test_identity.py b/test/unit/test_identity.py
index 76125f79..ec537ab9 100644
--- a/test/unit/test_identity.py
+++ b/test/unit/test_identity.py
@@ -1,2093 +1,2092 @@
-import os
-import sys
-# Need to access identity module
-sys.path.append(os.path.abspath(os.path.join(os.path.abspath(__file__),
- '..', '..', '..', '..', 'keystone')))
-from keystone import identity
-import unittest
-from webtest import TestApp
-import httplib2
-import json
-from lxml import etree
-import unittest
-from webtest import TestApp
-
-URL = 'http://localhost:8080/v1.0/'
-
-
-def get_token(user, pswd, kind=''):
- h = httplib2.Http(".cache")
- url = '%stoken' % URL
- body = {"passwordCredentials": {"username": user,
- "password": pswd}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json"})
- content = json.loads(content)
- token = str(content['auth']['token']['id'])
- if kind == 'token':
- return token
- else:
- return (resp, content)
-
-
-def delete_token(token, auth_token):
- h = httplib2.Http(".cache")
- url = '%stoken/%s' % (URL, token)
- resp, content = h.request(url, "DELETE", body='', \
- headers={"Content-Type": "application/json", \
- "X-Auth-Token": auth_token})
- return (resp, content)
-
-
-def create_tenant(tenantid, auth_token):
- h = httplib2.Http(".cache")
-
- url = '%stenants' % (URL)
- body = {"tenant": {"id": tenantid,
- "description": "A description ...",
- "enabled": True}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": auth_token})
- return (resp, content)
-
-
-def create_tenant_group(groupid, tenantid, auth_token):
- h = httplib2.Http(".cache")
-
- url = '%stenant/%s/groups' % (URL,tenantid)
- body = {"group": {"id": groupid,
- "description": "A description ..."
- }}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": auth_token})
- return (resp, content)
-
-
-def delete_tenant(tenantid, auth_token):
- h = httplib2.Http(".cache")
- url = '%stenants/%s' % (URL, tenantid)
- resp, content = h.request(url, "DELETE", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": auth_token})
- return (resp, content)
-
-
-def delete_tenant_group(groupid, tenantid, auth_token):
- h = httplib2.Http(".cache")
- url = '%stenant/%s/groups/%s' % (URL, tenantid, groupid)
- resp, content = h.request(url, "DELETE", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": auth_token})
- return (resp, content)
-
-
-def create_global_group(auth_token):
- h = httplib2.Http(".cache")
-
- url = '%s/groups' % (URL)
- body = {"group": {"id": 'Admin',
- "description": "A description ..."
- }}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": auth_token})
- return (resp, content)
-
-
-def delete_global_group(groupid, auth_token):
- h = httplib2.Http(".cache")
- url = '%s/groups/%s' % (URL, groupid)
- resp, content = h.request(url, "DELETE", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": auth_token})
- return (resp, content)
-
-
-def get_token_xml(user, pswd, type=''):
- h = httplib2.Http(".cache")
- url = '%stoken' % URL
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <passwordCredentials \
- xmlns="http://docs.openstack.org/idm/api/v1.0" \
- password="%s" username="%s" \
- tenantId="77654"/> ' % (pswd, user)
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "ACCEPT": "application/xml"})
- dom = etree.fromstring(content)
- root = dom.find("{http://docs.openstack.org/idm/api/v1.0}token")
- token_root = root.attrib
- token = str(token_root['id'])
- if type == 'token':
- return token
- else:
- return (resp, content)
-
-
-def delete_token_xml(token, auth_token):
- h = httplib2.Http(".cache")
- url = '%stoken/%s' % (URL, token)
- resp, content = h.request(url, "DELETE", body='',\
- headers={"Content-Type": "application/xml", \
- "X-Auth-Token": auth_token,
- "ACCEPT": "application/xml"})
- return (resp, content)
-
-
-def create_tenant_xml(tenantid, auth_token):
- h = httplib2.Http(".cache")
- url = '%stenants' % (URL)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true" id="%s"> \
- <description>A description...</description> \
- </tenant>' % tenantid
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": auth_token,
- "ACCEPT": "application/xml"})
- return (resp, content)
-
-
-def create_tenant_group_xml(groupid, tenantid, auth_token):
- h = httplib2.Http(".cache")
- url = '%stenant/%s/groups' % (URL,tenantid)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % groupid
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": auth_token,
- "ACCEPT": "application/xml"})
- return (resp, content)
-
-
-def delete_tenant_xml(tenantid, auth_token):
- h = httplib2.Http(".cache")
- url = '%stenants/%s' % (URL, tenantid)
- resp, content = h.request(url, "DELETE", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": auth_token,
- "ACCEPT": "application/xml"})
- return (resp, content)
-
-
-def delete_tenant_group_xml(groupid, tenantid, auth_token):
- h = httplib2.Http(".cache")
- url = '%stenant/%s/groups/%s' % (URL, tenantid, groupid)
- resp, content = h.request(url, "DELETE", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": auth_token,
- "ACCEPT": "application/xml"})
- return (resp, content)
-
-
-def create_global_group_xml(auth_token):
- h = httplib2.Http(".cache")
- url = '%s/groups' % (URL)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="Admin"> \
- <description>A description...</description> \
- </group>'
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": auth_token,
- "ACCEPT": "application/xml"})
- return (resp, content)
-
-
-def delete_global_group_xml(groupid, auth_token):
- h = httplib2.Http(".cache")
- url = '%s/groups/%s' % (URL, groupid)
- resp, content = h.request(url, "DELETE", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": auth_token,
- "ACCEPT": "application/xml"})
- return (resp, content)
-
-
-def get_tenant():
- return '1234'
-
-
-def get_user():
- return '1234'
-
-
-def get_userdisabled():
- return '1234'
-
-
-def get_auth_token():
- return '999888777666'
-
-
-def get_exp_auth_token():
- return '000999'
-
-
-def get_disabled_token():
- return '999888777'
-
-
-class identity_test(unittest.TestCase):
-
- #Given _a_ to make inherited test cases in an order.
- #here to call below method will call as last test case
-
- def test_a_get_version(self):
- h = httplib2.Http(".cache")
- url = URL
- resp, content = h.request(url, "GET", body="",
- headers={"Content-Type": "application/json"})
- self.assertEqual(200, int(resp['status']))
- self.assertEqual('application/json', resp['content-type'])
-
- def test_a_get_version(self):
- h = httplib2.Http(".cache")
- url = URL
- resp, content = h.request(url, "GET", body="",
- headers={"Content-Type": "application/xml",
- "ACCEPT": "application/xml"})
- self.assertEqual(200, int(resp['status']))
- self.assertEqual('application/xml', resp['content-type'])
-
-
-class authorize_test(identity_test):
-
- def setUp(self):
- self.token = get_token('joeuser', 'secrete', 'token')
- self.tenant = get_tenant()
- self.user = get_user()
- self.userdisabled = get_userdisabled()
- self.auth_token = get_auth_token()
- self.exp_auth_token = get_exp_auth_token()
- self.disabled_token = get_disabled_token()
-
-
-
- def tearDown(self):
- delete_token(self.token, self.auth_token)
-
- def test_a_authorize(self):
- resp, content = get_token('joeuser', 'secrete')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual('application/json', resp['content-type'])
-
- def test_a_authorize_xml(self):
- resp, content = get_token_xml('joeuser', 'secrete')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual('application/xml', resp['content-type'])
-
- def test_a_authorize_user_disaabled(self):
- h = httplib2.Http(".cache")
- url = '%stoken' % URL
- body = {"passwordCredentials": {"username": "disabled",
- "password": "self.tenant_group='test_tenant_group'secrete"}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json"})
- content = json.loads(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_a_authorize_user_disaabled_xml(self):
- h = httplib2.Http(".cache")
- url = '%stoken' % URL
-
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <passwordCredentials \
- xmlns="http://docs.openstack.org/idm/api/v1.0" \
- password="secrete" username="disabled" \
- />'
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "ACCEPT": "application/xml"})
- content = etree.fromstring(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_a_authorize_user_wrong(self):
- h = httplib2.Http(".cache")
- url = '%stoken' % URL
- body = {"passwordCredentials": {"username-w": "disabled",
- "password": "secrete"}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json"})
- content = json.loads(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(400, int(resp['status']))
-
- def test_a_authorize_user_wrong_xml(self):
- h = httplib2.Http(".cache")
- url = '%stoken' % URL
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <passwordCredentials \
- xmlns="http://docs.openstack.org/idm/api/v1.0" \
- password="secrete" username-w="disabled" \
- />'
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "ACCEPT": "application/xml"})
- content = etree.fromstring(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(400, int(resp['status']))
-
-
-class validate_token(authorize_test):
-
- def test_validate_token_true(self):
- h = httplib2.Http(".cache")
-
- url = '%stoken/%s?belongsTo=%s' % (URL, self.token, self.tenant)
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/json", \
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual('application/json', resp['content-type'])
-
- def test_validate_token_true_xml(self):
- h = httplib2.Http(".cache")
- url = '%stoken/%s?belongsTo=%s' % (URL, self.token, self.tenant)
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml", \
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual('application/xml', resp['content-type'])
-
- def test_validate_token_expired(self):
- h = httplib2.Http(".cache")
- url = '%stoken/%s?belongsTo=%s' % (URL, self.exp_auth_token, \
- self.tenant)
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/json", \
- "X-Auth-Token": self.exp_auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
- self.assertEqual('application/json', resp['content-type'])
-
- def test_validate_token_expired_xml(self):
- h = httplib2.Http(".cache")
-
- url = '%stoken/%s?belongsTo=%s' % (URL, self.exp_auth_token, \
- self.tenant)
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml", \
- "X-Auth-Token": self.exp_auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
- self.assertEqual('application/xml', resp['content-type'])
-
- def test_validate_token_invalid(self):
- h = httplib2.Http(".cache")
- url = '%stoken/%s?belongsTo=%s' % (URL, 'NonExistingToken', \
- self.tenant)
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/json", \
- "X-Auth-Token": self.auth_token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
- self.assertEqual('application/json', resp['content-type'])
-
- def test_validate_token_invalid_xml(self):
- h = httplib2.Http(".cache")
- url = '%stoken/%s?belongsTo=%s' % (URL, 'NonExistingToken', \
- self.tenant)
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/json", \
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
- self.assertEqual('application/json', resp['content-type'])
-
-
-class tenant_test(unittest.TestCase):
-
- def setUp(self):
- self.token = get_token('joeuser', 'secrete', 'token')
- self.tenant = get_tenant()
- self.user = get_user()
- self.userdisabled = get_userdisabled()
- self.auth_token = get_auth_token()
- self.exp_auth_token = get_exp_auth_token()
- self.disabled_token = get_disabled_token()
-
- def tearDown(self):
- resp, content = delete_tenant(self.tenant, self.auth_token)
-""" "passwordCredentials" : {"username" : "joeuser","password": "secrete","tenantId": "1234"}
-"""
-
-class create_tenant_test(tenant_test):
-
- def test_tenant_create(self):
- resp, content = delete_tenant('test_tenant', str(self.auth_token))
-
- resp, content = create_tenant('test_tenant', str(self.auth_token))
- self.tenant = 'test_tenant'
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
-
- if int(resp['status']) not in (200, 201):
-
- self.fail('Failed due to %d' % int(resp['status']))
-
- def test_tenant_create_xml(self):
- resp, content = delete_tenant_xml('test_tenant', str(self.auth_token))
- resp, content = create_tenant_xml('test_tenant', str(self.auth_token))
- self.tenant = 'test_tenant'
- content = etree.fromstring(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
-
- if int(resp['status']) not in (200, 201):
-
- self.fail('Failed due to %d' % int(resp['status']))
-
- def test_tenant_create_again(self):
-
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(409, int(resp['status']))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- def test_tenant_create_again_xml(self):
-
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get("id")
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(409, int(resp['status']))
- if int(resp['status']) == 200:
- self.tenant = content.get("id")
-
- def test_tenant_create_forbidden_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenants' % (URL)
- body = {"tenant": {"id": self.tenant,
- "description": "A description ...",
- "enabled": True}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": self.token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_create_forbidden_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenants' % (URL)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true" id="%s"> \
- <description>A description...</description> \
- </tenant>' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_create_expired_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenants' % (URL)
- body = {"tenant": {"id": self.tenant,
- "description": "A description ...",
- "enabled": True}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": self.exp_auth_token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_create_expired_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenants' % (URL)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true" id="%s"> \
- <description>A description...</description> \
- </tenant>' % self.tenant
-
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.exp_auth_token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_create_missing_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenants' % (URL)
- body = {"tenant": {"id": self.tenant,
- "description": "A description ...",
- "enabled": True}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_create_missing_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenants' % (URL)
-
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true" id="%s"> \
- <description>A description...</description> \
- </tenant>' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_create_disabled_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenants' % (URL)
- body = '{"tenant": { "id": "%s", \
- "description": "A description ...", "enabled"\
- :true } }' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.disabled_token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_create_disabled_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenants' % (URL)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true" id="%s"> \
- <description>A description...</description> \
- </tenant>' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "X-Auth-Token": self.disabled_token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_create_invalid_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenants' % (URL)
- body = '{"tenant": { "id": "%s", \
- "description": "A description ...", "enabled"\
- :true } }' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": 'nonexsitingtoken'})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_create_invalid_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenants' % (URL)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true" id="%s"> \
- <description>A description...</description> \
- </tenant>' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": 'nonexsitingtoken',
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
-
-class get_tenants_test(tenant_test):
-
- def test_get_tenants(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenants_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenants_forbidden_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_get_tenants_forbidden_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_get_tenants_exp_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.exp_auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_get_tenants_exp_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.exp_auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
-
-class get_tenant_test(tenant_test):
-
- def test_get_tenant(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, self.tenant)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenant_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, self.tenant)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenant_bad(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, 'tenant_bad')
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_get_tenant_bad_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, 'tenant_bad')
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_get_tenant_not_found(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/NonexistingID' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_get_tenant_not_found_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/NonexistingID' % (URL)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
-
-class update_tenant_test(tenant_test):
-
- def test_update_tenant(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, self.tenant)
- data = '{"tenant": { "description": "A NEW description..." ,\
- "enabled":true }}'
- #test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- body = json.loads(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual(int(self.tenant), int(body['tenant']['id']))
- self.assertEqual('A NEW description...', \
- body['tenant']['description'])
-
- def test_update_tenant_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, self.tenant)
- data = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true"> \
- <description>A NEW description...</description> \
- </tenant>'
-
- #test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- body = etree.fromstring(content)
- desc = body.find("{http://docs.openstack.org/idm/api/v1.0}description")
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual(int(self.tenant), int(body.get('id')))
- self.assertEqual('A NEW description...', \
- desc.text)
-
- def test_update_tenant_bad(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, self.tenant)
- data = '{"tenant": { "description_bad": "A NEW description...",\
- "enabled":true }}'
- #test for Content-Type = application/json
-
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(400, int(resp['status']))
-
- def test_update_tenant_bad_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/%s' % (URL, self.tenant)
- data = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true"> \
- <description_bad>A NEW description...</description> \
- </tenant>'
- #test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(400, int(resp['status']))
-
- def test_update_tenant_not_found(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/NonexistingID' % (URL)
- data = '{"tenant": { "description": "A NEW description...",\
- "enabled":true }}'
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body=data,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_update_tenant_not_found_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenants/NonexistingID' % (URL)
- data = '<?xml version="1.0" encoding="UTF-8"?> \
- <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
- enabled="true"> \
- <description_bad>A NEW description...</description> \
- </tenant>'
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body=data,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
-
-class delete_tenant_test(tenant_test):
-
- def test_delete_tenant_not_found(self):
- #resp,content=create_tenant("test_tenant_delete", str(self.auth_token))
- resp, content = delete_tenant("test_tenant_delete111", \
- str(self.auth_token))
- self.assertEqual(404, int(resp['status']))
-
- def test_delete_tenant_not_found_xml(self):
- #resp,content=create_tenant("test_tenant_delete", str(self.auth_token))
- resp, content = delete_tenant_xml("test_tenant_delete111", \
- str(self.auth_token))
- self.assertEqual(404, int(resp['status']))
-
- def test_delete_tenant(self):
- resp, content = create_tenant("test_tenant_delete", \
- str(self.auth_token))
- resp, content = delete_tenant("test_tenant_delete", \
- str(self.auth_token))
- self.assertEqual(204, int(resp['status']))
-
- def test_delete_tenant_xml(self):
- resp, content = create_tenant_xml("test_tenant_delete", \
- str(self.auth_token))
- resp, content = delete_tenant_xml("test_tenant_delete", \
- str(self.auth_token))
- self.assertEqual(204, int(resp['status']))
-
-
-
-
-class tenant_group_test(unittest.TestCase):
-
- def setUp(self):
- self.token = get_token('joeuser', 'secrete', 'token')
- self.tenant = get_tenant()
- self.user = get_user()
- self.userdisabled = get_userdisabled()
- self.auth_token = get_auth_token()
- self.exp_auth_token = get_exp_auth_token()
- self.disabled_token = get_disabled_token()
- self.tenant_group = 'test_tenant_group'
-
- def tearDown(self):
- resp, content = delete_tenant_group('test_tenant_group', \
- self.tenant, self.auth_token)
- resp, content = delete_tenant(self.tenant, self.auth_token)
-
-
-class create_tenant_group_test(tenant_group_test):
-
- def test_tenant_group_create(self):
- resp, content = delete_tenant('test_tenant', str(self.auth_token))
- resp, content = create_tenant('test_tenant', str(self.auth_token))
-
- respG, contentG = delete_tenant_group('test_tenant_group', \
- 'test_tenant', str(self.auth_token))
- respG, contentG = create_tenant_group('test_tenant_group', \
- 'test_tenant', str(self.auth_token))
-
- self.tenant = 'test_tenant'
- self.tenant_group = 'test_tenant_group'
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
-
- if int(respG['status']) not in (200, 201):
-
- self.fail('Failed due to %d' % int(respG['status']))
-
- def test_tenant_group_create_xml(self):
- resp, content = delete_tenant_xml('test_tenant', str(self.auth_token))
- resp, content = create_tenant_xml('test_tenant', str(self.auth_token))
- respG, contentG = delete_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
-
- self.tenant = 'test_tenant'
- self.tenant_group = 'test_tenant_group'
- content = etree.fromstring(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
-
- if int(respG['status']) not in (200, 201):
-
- self.fail('Failed due to %d' % int(respG['status']))
-
- def test_tenant_group_create_again(self):
-
- resp, content = create_tenant("test_tenant", str(self.auth_token))
-
- respG, contentG = create_tenant_group('test_tenant_group', \
- "test_tenant", str(self.auth_token))
- respG, contentG = create_tenant_group('test_tenant_group', \
- "test_tenant", str(self.auth_token))
-
- if int(respG['status']) == 200:
- self.tenant = content['tenant']['id']
- self.tenant_group = contentG['group']['id']
- if int(respG['status']) == 500:
- self.fail('IDM fault')
- elif int(respG['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(409, int(respG['status']))
- if int(respG['status']) == 200:
- self.tenant = content['tenant']['id']
- self.tenant_group = contentG['group']['id']
-
- def test_tenant_group_create_again_xml(self):
-
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
-
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
-
- content = etree.fromstring(content)
- contentG = etree.fromstring(contentG)
- if int(respG['status']) == 200:
- self.tenant = content.get("id")
- self.tenant_group = contentG.get("id")
-
- if int(respG['status']) == 500:
- self.fail('IDM fault')
- elif int(respG['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(409, int(respG['status']))
- if int(respG['status']) == 200:
- self.tenant = content.get("id")
- self.tenant_group = contentG.get("id")
-
- def test_tenant_group_create_forbidden_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- if int(respG['status']) == 200:
- self.tenant_group = respG['group']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = {"group": {"id": self.tenant_group,
- "description": "A description ..."
- }}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": self.token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_group_create_forbidden_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml", \
- "X-Auth-Token": self.token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
-
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_group_create_expired_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = {"group": {"id": self.tenant_group,
- "description": "A description ..."
- }}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": self.exp_auth_token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_expired_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant
-
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml", \
- "X-Auth-Token": self.exp_auth_token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_missing_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = {"group": {"id": self.tenant_group,
- "description": "A description ..."}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_missing_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
-
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_disabled_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '{"group": { "id": "%s", \
- "description": "A description ..." } }' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.disabled_token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_group_create_disabled_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "X-Auth-Token": self.disabled_token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_group_create_invalid_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '{"group": { "id": "%s", \
- "description": "A description ..." } }' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": 'nonexsitingtoken'})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_invalid_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": 'nonexsitingtoken',
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
-
-class get_tenant_groups_test(tenant_group_test):
-
- def test_get_tenant_groups(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
-
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
-
- url = '%stenant/%s/groups' % (URL,self.tenant)
-
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenant_groups_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
-
- respG, contentG = create_tenant_group_xml(self.tenant_group,\
- self.tenant, str(self.auth_token))
-
- url = '%stenant/%s/groups' % (URL,self.tenant)
-
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenant_groups_forbidden_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
-
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups' % (URL,self.tenant)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_get_tenant_groups_forbidden_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups' % (URL,self.tenant)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_get_tenant_groups_exp_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups' % (URL,self.tenant)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.exp_auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_get_tenant_groups_exp_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups' % (URL,self.tenant)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.exp_auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
-
-class get_tenant_group_test(tenant_group_test):
-
- def test_get_tenant_group(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenant_group_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group_xml(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
-
- def test_get_tenant_group_bad(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,'tenant_bad',self.tenant_group)
-
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_get_tenant_group_bad_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,'tenant_bad',self.tenant_group)
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_get_tenant_group_not_found(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,self.tenant,'nonexistinggroup')
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='{}',\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_get_tenant_group_not_found_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,self.tenant,'nonexistinggroup')
-
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body='',\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
-
-class update_tenant_group_test(tenant_group_test):
-
- def test_update_tenant_group(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group)
-
- data = '{"group": { "id":"%s","description": "A NEW description..." ,\
- "tenantId":"%s" }}' % (self.tenant_group,self.tenant)
- #test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- body = json.loads(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual(self.tenant_group, body['group']['id'])
- self.assertEqual('A NEW description...', \
- body['group']['description'])
-
- def test_update_tenant_group_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL, self.tenant ,self.tenant_group)
- data = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- tenantId="%s" id="%s"> \
- <description>A NEW description...</description> \
- </group>' % (self.tenant, self.tenant_group)
-
- #test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
-
- body = etree.fromstring(content)
- desc = body.find("{http://docs.openstack.org/idm/api/v1.0}description")
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(200, int(resp['status']))
- self.assertEqual(str(self.tenant_group), str(body.get('id')))
- self.assertEqual('A NEW description...', \
- desc.text)
-
- def test_update_tenant_group_bad(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group)
- data = '{"group": { "description_bad": "A NEW description...",\
- "id":"%s","tenantId":"%s" }}' % (self.tenant_group,self.tenant)
- #test for Content-Type = application/json
-
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(400, int(resp['status']))
-
- def test_update_tenant_group_bad_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group)
- data = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- tenantId="%s" id="%s"> \
- <description_bad>A NEW description...</description> \
- </group>' % (self.tenant, self.tenant_group)
- #test for Content-Type = application/json
- resp, content = h.request(url, "PUT", body=data,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(400, int(resp['status']))
-
- def test_update_tenant_group_not_found(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- respG, contentG = create_tenant_group(self.tenant_group,\
- self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/NonexistingID' % (URL, self.tenant)
-
- data = '{"group": { "description": "A NEW description...",\
- "id":"NonexistingID", "tenantId"="test_tenant" }}'
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body=data,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
- def test_update_tenant_group_not_found_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant(self.tenant, str(self.auth_token))
- url = '%stenant/%s/groups/NonexistingID' % (URL, self.tenant)
- data = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="NonexistingID", "tenant_id"="test_tenant"> \
- <description_bad>A NEW description...</description> \
- </group>'
- #test for Content-Type = application/json
- resp, content = h.request(url, "GET", body=data,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": self.auth_token,
- "ACCEPT": "application/xml"})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(404, int(resp['status']))
-
-
-class delete_tenant_group_test(tenant_test):
-
- def test_delete_tenant_group_not_found(self):
- #resp,content=create_tenant("test_tenant_delete", str(self.auth_token))
- resp, content = delete_tenant_group("test_tenant_delete111", \
- "test_tenant", str(self.auth_token))
- self.assertEqual(404, int(resp['status']))
-
- def test_delete_tenant_group_not_found_xml(self):
- #resp,content=create_tenant("test_tenant_delete", str(self.auth_token))
- resp, content = delete_tenant_group_xml("test_tenant_delete111", \
- "test_tenant", str(self.auth_token))
- self.assertEqual(404, int(resp['status']))
-
- def test_delete_tenant_group(self):
- resp, content = create_tenant("test_tenant_delete", \
- str(self.auth_token))
- respG, contentG = create_tenant_group('test_tenant_group_delete', \
- "test_tenant_delete", str(self.auth_token))
- respG, contentG = delete_tenant_group('test_tenant_group_delete', \
- "test_tenant_delete", str(self.auth_token))
- resp, content = delete_tenant("test_tenant_delete", \
- str(self.auth_token))
- self.assertEqual(204, int(respG['status']))
-
- def test_delete_tenant_group_xml(self):
- resp, content = create_tenant_xml("test_tenant_delete", \
- str(self.auth_token))
- respG, contentG = create_tenant_group_xml('test_tenant_group_delete', \
- "test_tenant_delete", str(self.auth_token))
- respG, contentG = delete_tenant_group_xml('test_tenant_group_delete', \
- "test_tenant_delete", str(self.auth_token))
- resp, content = delete_tenant_xml("test_tenant_delete", \
- str(self.auth_token))
- self.assertEqual(204, int(respG['status']))
-
-class create_global_group_test(global_group_test):
-
- def test_global_group_create(self):
-
- respG, contentG = delete_global_group('test_tenant_group', \
- str(self.auth_token))
- respG, contentG = create_global_group(str(self.auth_token))
- self.group = 'test_tenant_group'
-
- if int(respG['status']) == 500:
- self.fail('IDM fault')
- elif int(respG['status']) == 503:
- self.fail('Service Not Available')
- if int(respG['status']) not in (200, 201):
- self.fail('Failed due to %d' % int(respG['status']))
-
-
- def test_global_group_create_again(self):
-
- respG, contentG = create_global_group('test_tenant_group', \
- str(self.auth_token))
- respG, contentG = create_global_group('test_tenant_group', \
- "test_tenant", str(self.auth_token))
-
- if int(respG['status']) == 200:
- self.tenant = content['tenant']['id']
- self.tenant_group = contentG['group']['id']
- if int(respG['status']) == 500:
- self.fail('IDM fault')
- elif int(respG['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(409, int(respG['status']))
- if int(respG['status']) == 200:
- self.tenant = content['tenant']['id']
- self.tenant_group = contentG['group']['id']
-
-
- def test_tenant_group_create_forbidden_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- if int(respG['status']) == 200:
- self.tenant_group = respG['group']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = {"group": {"id": self.tenant_group,
- "description": "A description ..."
- }}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": self.token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
-
- def test_tenant_group_create_expired_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = {"group": {"id": self.tenant_group,
- "description": "A description ..."
- }}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json",
- "X-Auth-Token": self.exp_auth_token})
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_missing_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = {"group": {"id": self.tenant_group,
- "description": "A description ..."}}
- resp, content = h.request(url, "POST", body=json.dumps(body),
- headers={"Content-Type": "application/json"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_disabled_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '{"group": { "id": "%s", \
- "description": "A description ..." } }' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": self.disabled_token})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_group_create_invalid_token(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '{"group": { "id": "%s", \
- "description": "A description ..." } }' % self.tenant
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/json",\
- "X-Auth-Token": 'nonexsitingtoken'})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
-
-
- def test_tenant_group_create_xml(self):
- resp, content = delete_tenant_xml('test_tenant', str(self.auth_token))
- resp, content = create_tenant_xml('test_tenant', str(self.auth_token))
- respG, contentG = delete_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
-
- self.tenant = 'test_tenant'
- self.tenant_group = 'test_tenant_group'
- content = etree.fromstring(content)
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
-
- if int(respG['status']) not in (200, 201):
-
- self.fail('Failed due to %d' % int(respG['status']))
-
- def test_tenant_group_create_again_xml(self):
-
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
-
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
- respG, contentG = create_tenant_group_xml('test_tenant_group', \
- "test_tenant", str(self.auth_token))
-
- content = etree.fromstring(content)
- contentG = etree.fromstring(contentG)
- if int(respG['status']) == 200:
- self.tenant = content.get("id")
- self.tenant_group = contentG.get("id")
-
- if int(respG['status']) == 500:
- self.fail('IDM fault')
- elif int(respG['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(409, int(respG['status']))
- if int(respG['status']) == 200:
- self.tenant = content.get("id")
- self.tenant_group = contentG.get("id")
-
- def test_tenant_group_create_forbidden_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant("test_tenant", str(self.auth_token))
- if int(resp['status']) == 200:
- self.tenant = content['tenant']['id']
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml", \
- "X-Auth-Token": self.token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
-
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_group_create_expired_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant
-
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml", \
- "X-Auth-Token": self.exp_auth_token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_missing_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
-
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
- def test_tenant_group_create_disabled_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",
- "X-Auth-Token": self.disabled_token,
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(403, int(resp['status']))
-
- def test_tenant_group_create_invalid_token_xml(self):
- h = httplib2.Http(".cache")
- resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
- content = etree.fromstring(content)
- if int(resp['status']) == 200:
- self.tenant = content.get('id')
-
- url = '%stenant/%s/groups' % (URL, self.tenant)
- body = '<?xml version="1.0" encoding="UTF-8"?> \
- <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
- id="%s"> \
- <description>A description...</description> \
- </group>' % self.tenant_group
- resp, content = h.request(url, "POST", body=body,\
- headers={"Content-Type": "application/xml",\
- "X-Auth-Token": 'nonexsitingtoken',
- "ACCEPT": "application/xml"})
-
- if int(resp['status']) == 500:
- self.fail('IDM fault')
- elif int(resp['status']) == 503:
- self.fail('Service Not Available')
- self.assertEqual(401, int(resp['status']))
-
-
-
-if __name__ == '__main__':
- unittest.main()
+import os
+import sys
+# Need to access identity module
+sys.path.append(os.path.abspath(os.path.join(os.path.abspath(__file__),
+ '..', '..', '..', '..', 'keystone')))
+from keystone import identity
+import unittest
+from webtest import TestApp
+import httplib2
+import json
+from lxml import etree
+import unittest
+from webtest import TestApp
+
+URL = 'http://localhost:8080/v1.0/'
+
+
+def get_token(user, pswd, kind=''):
+ h = httplib2.Http(".cache")
+ url = '%stoken' % URL
+ body = {"passwordCredentials": {"username": user,
+ "password": pswd}}
+ resp, content = h.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json"})
+ content = json.loads(content)
+ token = str(content['auth']['token']['id'])
+ if kind == 'token':
+ return token
+ else:
+ return (resp, content)
+
+
+def delete_token(token, auth_token):
+ h = httplib2.Http(".cache")
+ url = '%stoken/%s' % (URL, token)
+ resp, content = h.request(url, "DELETE", body='', \
+ headers={"Content-Type": "application/json", \
+ "X-Auth-Token": auth_token})
+ return (resp, content)
+
+
+def create_tenant(tenantid, auth_token):
+ h = httplib2.Http(".cache")
+
+ url = '%stenants' % (URL)
+ body = {"tenant": {"id": tenantid,
+ "description": "A description ...",
+ "enabled": True}}
+ resp, content = h.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": auth_token})
+ return (resp, content)
+
+
+def create_tenant_group(groupid, tenantid, auth_token):
+ h = httplib2.Http(".cache")
+
+ url = '%stenant/%s/groups' % (URL,tenantid)
+ body = {"group": {"id": groupid,
+ "description": "A description ..."
+ }}
+ resp, content = h.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": auth_token})
+ return (resp, content)
+
+
+def delete_tenant(tenantid, auth_token):
+ h = httplib2.Http(".cache")
+ url = '%stenants/%s' % (URL, tenantid)
+ resp, content = h.request(url, "DELETE", body='{}',\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": auth_token})
+ return (resp, content)
+
+
+def delete_tenant_group(groupid, tenantid, auth_token):
+ h = httplib2.Http(".cache")
+ url = '%stenant/%s/groups/%s' % (URL, tenantid, groupid)
+ resp, content = h.request(url, "DELETE", body='{}',\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": auth_token})
+ return (resp, content)
+
+
+def create_global_group(auth_token):
+ h = httplib2.Http(".cache")
+
+ url = '%s/groups' % (URL)
+ body = {"group": {"id": 'Admin',
+ "description": "A description ..."
+ }}
+ resp, content = h.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": auth_token})
+ return (resp, content)
+
+
+def delete_global_group(groupid, auth_token):
+ h = httplib2.Http(".cache")
+ url = '%s/groups/%s' % (URL, groupid)
+ resp, content = h.request(url, "DELETE", body='{}',\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": auth_token})
+ return (resp, content)
+
+
+def get_token_xml(user, pswd, type=''):
+ h = httplib2.Http(".cache")
+ url = '%stoken' % URL
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <passwordCredentials \
+ xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ password="%s" username="%s" \
+ tenantId="77654"/> ' % (pswd, user)
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",
+ "ACCEPT": "application/xml"})
+ dom = etree.fromstring(content)
+ root = dom.find("{http://docs.openstack.org/idm/api/v1.0}token")
+ token_root = root.attrib
+ token = str(token_root['id'])
+ if type == 'token':
+ return token
+ else:
+ return (resp, content)
+
+
+def delete_token_xml(token, auth_token):
+ h = httplib2.Http(".cache")
+ url = '%stoken/%s' % (URL, token)
+ resp, content = h.request(url, "DELETE", body='',\
+ headers={"Content-Type": "application/xml", \
+ "X-Auth-Token": auth_token,
+ "ACCEPT": "application/xml"})
+ return (resp, content)
+
+
+def create_tenant_xml(tenantid, auth_token):
+ h = httplib2.Http(".cache")
+ url = '%stenants' % (URL)
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ enabled="true" id="%s"> \
+ <description>A description...</description> \
+ </tenant>' % tenantid
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": auth_token,
+ "ACCEPT": "application/xml"})
+ return (resp, content)
+
+
+def create_tenant_group_xml(groupid, tenantid, auth_token):
+ h = httplib2.Http(".cache")
+ url = '%stenant/%s/groups' % (URL,tenantid)
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ id="%s"> \
+ <description>A description...</description> \
+ </group>' % groupid
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": auth_token,
+ "ACCEPT": "application/xml"})
+ return (resp, content)
+
+
+def delete_tenant_xml(tenantid, auth_token):
+ h = httplib2.Http(".cache")
+ url = '%stenants/%s' % (URL, tenantid)
+ resp, content = h.request(url, "DELETE", body='',\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": auth_token,
+ "ACCEPT": "application/xml"})
+ return (resp, content)
+
+
+def delete_tenant_group_xml(groupid, tenantid, auth_token):
+ h = httplib2.Http(".cache")
+ url = '%stenant/%s/groups/%s' % (URL, tenantid, groupid)
+ resp, content = h.request(url, "DELETE", body='',\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": auth_token,
+ "ACCEPT": "application/xml"})
+ return (resp, content)
+
+
+def create_global_group_xml(auth_token):
+ h = httplib2.Http(".cache")
+ url = '%s/groups' % (URL)
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ id="Admin"> \
+ <description>A description...</description> \
+ </group>'
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": auth_token,
+ "ACCEPT": "application/xml"})
+ return (resp, content)
+
+
+def delete_global_group_xml(groupid, auth_token):
+ h = httplib2.Http(".cache")
+ url = '%s/groups/%s' % (URL, groupid)
+ resp, content = h.request(url, "DELETE", body='',\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": auth_token,
+ "ACCEPT": "application/xml"})
+ return (resp, content)
+
+
+def get_tenant():
+ return '1234'
+
+
+def get_user():
+ return '1234'
+
+
+def get_userdisabled():
+ return '1234'
+
+
+def get_auth_token():
+ return '999888777666'
+
+
+def get_exp_auth_token():
+ return '000999'
+
+
+def get_disabled_token():
+ return '999888777'
+
+
+class identity_test(unittest.TestCase):
+
+ #Given _a_ to make inherited test cases in an order.
+ #here to call below method will call as last test case
+
+ def test_a_get_version(self):
+ h = httplib2.Http(".cache")
+ url = URL
+ resp, content = h.request(url, "GET", body="",
+ headers={"Content-Type": "application/json"})
+ self.assertEqual(200, int(resp['status']))
+ self.assertEqual('application/json', resp['content-type'])
+
+ def test_a_get_version(self):
+ h = httplib2.Http(".cache")
+ url = URL
+ resp, content = h.request(url, "GET", body="",
+ headers={"Content-Type": "application/xml",
+ "ACCEPT": "application/xml"})
+ self.assertEqual(200, int(resp['status']))
+ self.assertEqual('application/xml', resp['content-type'])
+
+
+class authorize_test(identity_test):
+
+ def setUp(self):
+ self.token = get_token('joeuser', 'secrete', 'token')
+ self.tenant = get_tenant()
+ self.user = get_user()
+ self.userdisabled = get_userdisabled()
+ self.auth_token = get_auth_token()
+ self.exp_auth_token = get_exp_auth_token()
+ self.disabled_token = get_disabled_token()
+
+
+
+ def tearDown(self):
+ delete_token(self.token, self.auth_token)
+
+ def test_a_authorize(self):
+ resp, content = get_token('joeuser', 'secrete')
+ self.assertEqual(200, int(resp['status']))
+ self.assertEqual('application/json', resp['content-type'])
+
+ def test_a_authorize_xml(self):
+ resp, content = get_token_xml('joeuser', 'secrete')
+ self.assertEqual(200, int(resp['status']))
+ self.assertEqual('application/xml', resp['content-type'])
+
+ def test_a_authorize_user_disaabled(self):
+ h = httplib2.Http(".cache")
+ url = '%stoken' % URL
+ body = {"passwordCredentials": {"username": "disabled",
+ "password": "self.tenant_group='test_tenant_group'secrete"}}
+ resp, content = h.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json"})
+ content = json.loads(content)
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+ def test_a_authorize_user_disaabled_xml(self):
+ h = httplib2.Http(".cache")
+ url = '%stoken' % URL
+
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <passwordCredentials \
+ xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ password="secrete" username="disabled" \
+ />'
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",
+ "ACCEPT": "application/xml"})
+ content = etree.fromstring(content)
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+ def test_a_authorize_user_wrong(self):
+ h = httplib2.Http(".cache")
+ url = '%stoken' % URL
+ body = {"passwordCredentials": {"username-w": "disabled",
+ "password": "secrete"}}
+ resp, content = h.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json"})
+ content = json.loads(content)
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(400, int(resp['status']))
+
+ def test_a_authorize_user_wrong_xml(self):
+ h = httplib2.Http(".cache")
+ url = '%stoken' % URL
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <passwordCredentials \
+ xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ password="secrete" username-w="disabled" \
+ />'
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",
+ "ACCEPT": "application/xml"})
+ content = etree.fromstring(content)
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(400, int(resp['status']))
+
+
+class validate_token(authorize_test):
+
+ def test_validate_token_true(self):
+ h = httplib2.Http(".cache")
+
+ url = '%stoken/%s?belongsTo=%s' % (URL, self.token, self.tenant)
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/json", \
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+ self.assertEqual('application/json', resp['content-type'])
+
+ def test_validate_token_true_xml(self):
+ h = httplib2.Http(".cache")
+ url = '%stoken/%s?belongsTo=%s' % (URL, self.token, self.tenant)
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/xml", \
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+ self.assertEqual('application/xml', resp['content-type'])
+
+ def test_validate_token_expired(self):
+ h = httplib2.Http(".cache")
+ url = '%stoken/%s?belongsTo=%s' % (URL, self.exp_auth_token, \
+ self.tenant)
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/json", \
+ "X-Auth-Token": self.exp_auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+ self.assertEqual('application/json', resp['content-type'])
+
+ def test_validate_token_expired_xml(self):
+ h = httplib2.Http(".cache")
+
+ url = '%stoken/%s?belongsTo=%s' % (URL, self.exp_auth_token, \
+ self.tenant)
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/xml", \
+ "X-Auth-Token": self.exp_auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+ self.assertEqual('application/xml', resp['content-type'])
+
+ def test_validate_token_invalid(self):
+ h = httplib2.Http(".cache")
+ url = '%stoken/%s?belongsTo=%s' % (URL, 'NonExistingToken', \
+ self.tenant)
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/json", \
+ "X-Auth-Token": self.auth_token})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+ self.assertEqual('application/json', resp['content-type'])
+
+ def test_validate_token_invalid_xml(self):
+ h = httplib2.Http(".cache")
+ url = '%stoken/%s?belongsTo=%s' % (URL, 'NonExistingToken', \
+ self.tenant)
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/json", \
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+ self.assertEqual('application/json', resp['content-type'])
+
+
+class tenant_test(unittest.TestCase):
+
+ def setUp(self):
+ self.token = get_token('joeuser', 'secrete', 'token')
+ self.tenant = get_tenant()
+ self.user = get_user()
+ self.userdisabled = get_userdisabled()
+ self.auth_token = get_auth_token()
+ self.exp_auth_token = get_exp_auth_token()
+ self.disabled_token = get_disabled_token()
+
+ def tearDown(self):
+ resp, content = delete_tenant(self.tenant, self.auth_token)
+""" "passwordCredentials" : {"username" : "joeuser","password": "secrete","tenantId": "1234"}
+"""
+
+class create_tenant_test(tenant_test):
+
+ def test_tenant_create(self):
+ resp, content = delete_tenant('test_tenant', str(self.auth_token))
+
+ resp, content = create_tenant('test_tenant', str(self.auth_token))
+ self.tenant = 'test_tenant'
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+
+ if int(resp['status']) not in (200, 201):
+
+ self.fail('Failed due to %d' % int(resp['status']))
+
+ def test_tenant_create_xml(self):
+ resp, content = delete_tenant_xml('test_tenant', str(self.auth_token))
+ resp, content = create_tenant_xml('test_tenant', str(self.auth_token))
+ self.tenant = 'test_tenant'
+ content = etree.fromstring(content)
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+
+ if int(resp['status']) not in (200, 201):
+
+ self.fail('Failed due to %d' % int(resp['status']))
+
+ def test_tenant_create_again(self):
+
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(409, int(resp['status']))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ def test_tenant_create_again_xml(self):
+
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ content = etree.fromstring(content)
+ if int(resp['status']) == 200:
+ self.tenant = content.get("id")
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(409, int(resp['status']))
+ if int(resp['status']) == 200:
+ self.tenant = content.get("id")
+
+ def test_tenant_create_forbidden_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ url = '%stenants' % (URL)
+ body = {"tenant": {"id": self.tenant,
+ "description": "A description ...",
+ "enabled": True}}
+ resp, content = h.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": self.token})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+ def test_tenant_create_forbidden_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ content = etree.fromstring(content)
+ if int(resp['status']) == 200:
+ self.tenant = content.get('id')
+
+ url = '%stenants' % (URL)
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ enabled="true" id="%s"> \
+ <description>A description...</description> \
+ </tenant>' % self.tenant
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.token,
+ "ACCEPT": "application/xml"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+ def test_tenant_create_expired_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ url = '%stenants' % (URL)
+ body = {"tenant": {"id": self.tenant,
+ "description": "A description ...",
+ "enabled": True}}
+ resp, content = h.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": self.exp_auth_token})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+ def test_tenant_create_expired_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ content = etree.fromstring(content)
+ if int(resp['status']) == 200:
+ self.tenant = content.get('id')
+
+ url = '%stenants' % (URL)
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ enabled="true" id="%s"> \
+ <description>A description...</description> \
+ </tenant>' % self.tenant
+
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.exp_auth_token,
+ "ACCEPT": "application/xml"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+ def test_tenant_create_missing_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ url = '%stenants' % (URL)
+ body = {"tenant": {"id": self.tenant,
+ "description": "A description ...",
+ "enabled": True}}
+ resp, content = h.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+ def test_tenant_create_missing_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ content = etree.fromstring(content)
+ if int(resp['status']) == 200:
+ self.tenant = content.get('id')
+
+ url = '%stenants' % (URL)
+
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ enabled="true" id="%s"> \
+ <description>A description...</description> \
+ </tenant>' % self.tenant
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",
+ "ACCEPT": "application/xml"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+ def test_tenant_create_disabled_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ url = '%stenants' % (URL)
+ body = '{"tenant": { "id": "%s", \
+ "description": "A description ...", "enabled"\
+ :true } }' % self.tenant
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.disabled_token})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+ def test_tenant_create_disabled_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ content = etree.fromstring(content)
+ if int(resp['status']) == 200:
+ self.tenant = content.get('id')
+
+ url = '%stenants' % (URL)
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ enabled="true" id="%s"> \
+ <description>A description...</description> \
+ </tenant>' % self.tenant
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",
+ "X-Auth-Token": self.disabled_token,
+ "ACCEPT": "application/xml"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+ def test_tenant_create_invalid_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ url = '%stenants' % (URL)
+ body = '{"tenant": { "id": "%s", \
+ "description": "A description ...", "enabled"\
+ :true } }' % self.tenant
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": 'nonexsitingtoken'})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+ def test_tenant_create_invalid_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ content = etree.fromstring(content)
+ if int(resp['status']) == 200:
+ self.tenant = content.get('id')
+
+ url = '%stenants' % (URL)
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ enabled="true" id="%s"> \
+ <description>A description...</description> \
+ </tenant>' % self.tenant
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": 'nonexsitingtoken',
+ "ACCEPT": "application/xml"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+
+class get_tenants_test(tenant_test):
+
+ def test_get_tenants(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants' % (URL)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='{}',\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+
+ def test_get_tenants_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants' % (URL)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+
+ def test_get_tenants_forbidden_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants' % (URL)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='{}',\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+ def test_get_tenants_forbidden_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants' % (URL)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+ def test_get_tenants_exp_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants' % (URL)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='{}',\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.exp_auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+ def test_get_tenants_exp_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants' % (URL)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.exp_auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+
+class get_tenant_test(tenant_test):
+
+ def test_get_tenant(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/%s' % (URL, self.tenant)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='{}',\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+
+ def test_get_tenant_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/%s' % (URL, self.tenant)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+
+ def test_get_tenant_bad(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/%s' % (URL, 'tenant_bad')
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='{',\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+
+ def test_get_tenant_bad_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/%s' % (URL, 'tenant_bad')
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='{',\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+
+ def test_get_tenant_not_found(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/NonexistingID' % (URL)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='{}',\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+
+ def test_get_tenant_not_found_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/NonexistingID' % (URL)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+
+
+class update_tenant_test(tenant_test):
+
+ def test_update_tenant(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/%s' % (URL, self.tenant)
+ data = '{"tenant": { "description": "A NEW description..." ,\
+ "enabled":true }}'
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "PUT", body=data,\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.auth_token})
+ body = json.loads(content)
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+ self.assertEqual(int(self.tenant), int(body['tenant']['id']))
+ self.assertEqual('A NEW description...', \
+ body['tenant']['description'])
+
+ def test_update_tenant_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant_xml(self.tenant, str(self.auth_token))
+ url = '%stenants/%s' % (URL, self.tenant)
+ data = '<?xml version="1.0" encoding="UTF-8"?> \
+ <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ enabled="true"> \
+ <description>A NEW description...</description> \
+ </tenant>'
+
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "PUT", body=data,\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+ body = etree.fromstring(content)
+ desc = body.find("{http://docs.openstack.org/idm/api/v1.0}description")
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+ self.assertEqual(int(self.tenant), int(body.get('id')))
+ self.assertEqual('A NEW description...', \
+ desc.text)
+
+ def test_update_tenant_bad(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/%s' % (URL, self.tenant)
+ data = '{"tenant": { "description_bad": "A NEW description...",\
+ "enabled":true }}'
+ #test for Content-Type = application/json
+
+ resp, content = h.request(url, "PUT", body=data,\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(400, int(resp['status']))
+
+ def test_update_tenant_bad_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/%s' % (URL, self.tenant)
+ data = '<?xml version="1.0" encoding="UTF-8"?> \
+ <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ enabled="true"> \
+ <description_bad>A NEW description...</description> \
+ </tenant>'
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "PUT", body=data,\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(400, int(resp['status']))
+
+ def test_update_tenant_not_found(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/NonexistingID' % (URL)
+ data = '{"tenant": { "description": "A NEW description...",\
+ "enabled":true }}'
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body=data,\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+
+ def test_update_tenant_not_found_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenants/NonexistingID' % (URL)
+ data = '<?xml version="1.0" encoding="UTF-8"?> \
+ <tenant xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ enabled="true"> \
+ <description_bad>A NEW description...</description> \
+ </tenant>'
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body=data,\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+
+
+class delete_tenant_test(tenant_test):
+
+ def test_delete_tenant_not_found(self):
+ #resp,content=create_tenant("test_tenant_delete", str(self.auth_token))
+ resp, content = delete_tenant("test_tenant_delete111", \
+ str(self.auth_token))
+ self.assertEqual(404, int(resp['status']))
+
+ def test_delete_tenant_not_found_xml(self):
+ #resp,content=create_tenant("test_tenant_delete", str(self.auth_token))
+ resp, content = delete_tenant_xml("test_tenant_delete111", \
+ str(self.auth_token))
+ self.assertEqual(404, int(resp['status']))
+
+ def test_delete_tenant(self):
+ resp, content = create_tenant("test_tenant_delete", \
+ str(self.auth_token))
+ resp, content = delete_tenant("test_tenant_delete", \
+ str(self.auth_token))
+ self.assertEqual(204, int(resp['status']))
+
+ def test_delete_tenant_xml(self):
+ resp, content = create_tenant_xml("test_tenant_delete", \
+ str(self.auth_token))
+ resp, content = delete_tenant_xml("test_tenant_delete", \
+ str(self.auth_token))
+ self.assertEqual(204, int(resp['status']))
+
+
+
+
+class tenant_group_test(unittest.TestCase):
+
+def setUp(self):
+ self.token = get_token('joeuser', 'secrete', 'token')
+ self.tenant = get_tenant()
+ self.user = get_user()
+ self.userdisabled = get_userdisabled()
+ self.auth_token = get_auth_token()
+ self.exp_auth_token = get_exp_auth_token()
+ self.disabled_token = get_disabled_token()
+ self.tenant_group = 'test_tenant_group'
+
+def tearDown(self):
+ resp, content = delete_tenant_group('test_tenant_group', \
+ self.tenant, self.auth_token)
+ resp, content = delete_tenant(self.tenant, self.auth_token)
+
+
+class create_tenant_group_test(tenant_group_test):
+
+def test_tenant_group_create(self):
+ resp, content = delete_tenant('test_tenant', str(self.auth_token))
+ resp, content = create_tenant('test_tenant', str(self.auth_token))
+
+ respG, contentG = delete_tenant_group('test_tenant_group', \
+ 'test_tenant', str(self.auth_token))
+ respG, contentG = create_tenant_group('test_tenant_group', \
+ 'test_tenant', str(self.auth_token))
+
+ self.tenant = 'test_tenant'
+ self.tenant_group = 'test_tenant_group'
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+
+ if int(respG['status']) not in (200, 201):
+
+ self.fail('Failed due to %d' % int(respG['status']))
+
+def test_tenant_group_create_xml(self):
+ resp, content = delete_tenant_xml('test_tenant', str(self.auth_token))
+ resp, content = create_tenant_xml('test_tenant', str(self.auth_token))
+ respG, contentG = delete_tenant_group_xml('test_tenant_group', \
+ "test_tenant", str(self.auth_token))
+ respG, contentG = create_tenant_group_xml('test_tenant_group', \
+ "test_tenant", str(self.auth_token))
+
+ self.tenant = 'test_tenant'
+ self.tenant_group = 'test_tenant_group'
+ content = etree.fromstring(content)
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+
+ if int(respG['status']) not in (200, 201):
+
+ self.fail('Failed due to %d' % int(respG['status']))
+
+def test_tenant_group_create_again(self):
+
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+
+ respG, contentG = create_tenant_group('test_tenant_group', \
+ "test_tenant", str(self.auth_token))
+ respG, contentG = create_tenant_group('test_tenant_group', \
+ "test_tenant", str(self.auth_token))
+
+ if int(respG['status']) == 200:
+ self.tenant = content['tenant']['id']
+ self.tenant_group = contentG['group']['id']
+ if int(respG['status']) == 500:
+ self.fail('IDM fault')
+ elif int(respG['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(409, int(respG['status']))
+ if int(respG['status']) == 200:
+ self.tenant = content['tenant']['id']
+ self.tenant_group = contentG['group']['id']
+
+def test_tenant_group_create_again_xml(self):
+
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+
+ respG, contentG = create_tenant_group_xml('test_tenant_group', \
+ "test_tenant", str(self.auth_token))
+ respG, contentG = create_tenant_group_xml('test_tenant_group', \
+ "test_tenant", str(self.auth_token))
+
+ content = etree.fromstring(content)
+ contentG = etree.fromstring(contentG)
+ if int(respG['status']) == 200:
+ self.tenant = content.get("id")
+ self.tenant_group = contentG.get("id")
+
+ if int(respG['status']) == 500:
+ self.fail('IDM fault')
+ elif int(respG['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(409, int(respG['status']))
+ if int(respG['status']) == 200:
+ self.tenant = content.get("id")
+ self.tenant_group = contentG.get("id")
+
+def test_tenant_group_create_forbidden_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ respG, contentG = create_tenant_group_xml('test_tenant_group', \
+ "test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ if int(respG['status']) == 200:
+ self.tenant_group = respG['group']['id']
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = {"group": {"id": self.tenant_group,
+ "description": "A description ..."
+ }}
+ resp, content = h.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": self.token})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+def test_tenant_group_create_forbidden_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ id="%s"> \
+ <description>A description...</description> \
+ </group>' % self.tenant_group
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml", \
+ "X-Auth-Token": self.token,
+ "ACCEPT": "application/xml"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+
+ self.assertEqual(403, int(resp['status']))
+
+def test_tenant_group_create_expired_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = {"group": {"id": self.tenant_group,
+ "description": "A description ..."
+ }}
+ resp, content = h.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": self.exp_auth_token})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+def test_tenant_group_create_expired_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ content = etree.fromstring(content)
+ if int(resp['status']) == 200:
+ self.tenant = content.get('id')
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ id="%s"> \
+ <description>A description...</description> \
+ </group>' % self.tenant
+
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml", \
+ "X-Auth-Token": self.exp_auth_token,
+ "ACCEPT": "application/xml"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+def test_tenant_group_create_missing_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = {"group": {"id": self.tenant_group,
+ "description": "A description ..."}}
+ resp, content = h.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+def test_tenant_group_create_missing_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ content = etree.fromstring(content)
+ if int(resp['status']) == 200:
+ self.tenant = content.get('id')
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ id="%s"> \
+ <description>A description...</description> \
+ </group>' % self.tenant_group
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",
+ "ACCEPT": "application/xml"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+def test_tenant_group_create_disabled_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = '{"group": { "id": "%s", \
+ "description": "A description ..." } }' % self.tenant_group
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.disabled_token})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+def test_tenant_group_create_disabled_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ content = etree.fromstring(content)
+ if int(resp['status']) == 200:
+ self.tenant = content.get('id')
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ id="%s"> \
+ <description>A description...</description> \
+ </group>' % self.tenant_group
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",
+ "X-Auth-Token": self.disabled_token,
+ "ACCEPT": "application/xml"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+def test_tenant_group_create_invalid_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = '{"group": { "id": "%s", \
+ "description": "A description ..." } }' % self.tenant
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": 'nonexsitingtoken'})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+def test_tenant_group_create_invalid_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ content = etree.fromstring(content)
+ if int(resp['status']) == 200:
+ self.tenant = content.get('id')
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ id="%s"> \
+ <description>A description...</description> \
+ </group>' % self.tenant_group
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": 'nonexsitingtoken',
+ "ACCEPT": "application/xml"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+
+class get_tenant_groups_test(tenant_group_test):
+
+def test_get_tenant_groups(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+
+ respG, contentG = create_tenant_group(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+
+ url = '%stenant/%s/groups' % (URL,self.tenant)
+
+ resp, content = h.request(url, "GET", body='{}',\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+
+def test_get_tenant_groups_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+
+ respG, contentG = create_tenant_group_xml(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+
+ url = '%stenant/%s/groups' % (URL,self.tenant)
+
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+
+def test_get_tenant_groups_forbidden_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+
+ respG, contentG = create_tenant_group(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups' % (URL,self.tenant)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='{}',\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+def test_get_tenant_groups_forbidden_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ respG, contentG = create_tenant_group(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups' % (URL,self.tenant)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+def test_get_tenant_groups_exp_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ respG, contentG = create_tenant_group(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups' % (URL,self.tenant)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='{}',\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.exp_auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+def test_get_tenant_groups_exp_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ respG, contentG = create_tenant_group(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups' % (URL,self.tenant)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.exp_auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+
+class get_tenant_group_test(tenant_group_test):
+
+def test_get_tenant_group(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ respG, contentG = create_tenant_group(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='{}',\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+
+def test_get_tenant_group_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ respG, contentG = create_tenant_group_xml(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+
+def test_get_tenant_group_bad(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ respG, contentG = create_tenant_group(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups/%s' % (URL,'tenant_bad',self.tenant_group)
+
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='{',\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+
+def test_get_tenant_group_bad_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ respG, contentG = create_tenant_group(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups/%s' % (URL,'tenant_bad',self.tenant_group)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='{',\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+
+def test_get_tenant_group_not_found(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ respG, contentG = create_tenant_group(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups/%s' % (URL,self.tenant,'nonexistinggroup')
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='{}',\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+
+def test_get_tenant_group_not_found_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ respG, contentG = create_tenant_group(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups/%s' % (URL,self.tenant,'nonexistinggroup')
+
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body='',\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+
+
+class update_tenant_group_test(tenant_group_test):
+
+def test_update_tenant_group(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ respG, contentG = create_tenant_group(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group)
+
+ data = '{"group": { "id":"%s","description": "A NEW description..." ,\
+ "tenantId":"%s" }}' % (self.tenant_group,self.tenant)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "PUT", body=data,\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.auth_token})
+ body = json.loads(content)
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+ self.assertEqual(self.tenant_group, body['group']['id'])
+ self.assertEqual('A NEW description...', \
+ body['group']['description'])
+
+def test_update_tenant_group_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant_xml(self.tenant, str(self.auth_token))
+ respG, contentG = create_tenant_group(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups/%s' % (URL, self.tenant ,self.tenant_group)
+ data = '<?xml version="1.0" encoding="UTF-8"?> \
+ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ tenantId="%s" id="%s"> \
+ <description>A NEW description...</description> \
+ </group>' % (self.tenant, self.tenant_group)
+
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "PUT", body=data,\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+
+ body = etree.fromstring(content)
+ desc = body.find("{http://docs.openstack.org/idm/api/v1.0}description")
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+ self.assertEqual(str(self.tenant_group), str(body.get('id')))
+ self.assertEqual('A NEW description...', \
+ desc.text)
+
+def test_update_tenant_group_bad(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ respG, contentG = create_tenant_group(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group)
+ data = '{"group": { "description_bad": "A NEW description...",\
+ "id":"%s","tenantId":"%s" }}' % (self.tenant_group,self.tenant)
+ #test for Content-Type = application/json
+
+ resp, content = h.request(url, "PUT", body=data,\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(400, int(resp['status']))
+
+def test_update_tenant_group_bad_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ respG, contentG = create_tenant_group(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups/%s' % (URL,self.tenant,self.tenant_group)
+ data = '<?xml version="1.0" encoding="UTF-8"?> \
+ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ tenantId="%s" id="%s"> \
+ <description_bad>A NEW description...</description> \
+ </group>' % (self.tenant, self.tenant_group)
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "PUT", body=data,\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(400, int(resp['status']))
+
+def test_update_tenant_group_not_found(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ respG, contentG = create_tenant_group(self.tenant_group,\
+ self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups/NonexistingID' % (URL, self.tenant)
+
+ data = '{"group": { "description": "A NEW description...",\
+ "id":"NonexistingID", "tenantId"="test_tenant" }}'
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body=data,\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+
+def test_update_tenant_group_not_found_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant(self.tenant, str(self.auth_token))
+ url = '%stenant/%s/groups/NonexistingID' % (URL, self.tenant)
+ data = '<?xml version="1.0" encoding="UTF-8"?> \
+ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ id="NonexistingID", "tenant_id"="test_tenant"> \
+ <description_bad>A NEW description...</description> \
+ </group>'
+ #test for Content-Type = application/json
+ resp, content = h.request(url, "GET", body=data,\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+
+
+class delete_tenant_group_test(tenant_test):
+
+def test_delete_tenant_group_not_found(self):
+ #resp,content=create_tenant("test_tenant_delete", str(self.auth_token))
+ resp, content = delete_tenant_group("test_tenant_delete111", \
+ "test_tenant", str(self.auth_token))
+ self.assertEqual(404, int(resp['status']))
+
+def test_delete_tenant_group_not_found_xml(self):
+ #resp,content=create_tenant("test_tenant_delete", str(self.auth_token))
+ resp, content = delete_tenant_group_xml("test_tenant_delete111", \
+ "test_tenant", str(self.auth_token))
+ self.assertEqual(404, int(resp['status']))
+
+def test_delete_tenant_group(self):
+ resp, content = create_tenant("test_tenant_delete", \
+ str(self.auth_token))
+ respG, contentG = create_tenant_group('test_tenant_group_delete', \
+ "test_tenant_delete", str(self.auth_token))
+ respG, contentG = delete_tenant_group('test_tenant_group_delete', \
+ "test_tenant_delete", str(self.auth_token))
+ resp, content = delete_tenant("test_tenant_delete", \
+ str(self.auth_token))
+ self.assertEqual(204, int(respG['status']))
+
+def test_delete_tenant_group_xml(self):
+ resp, content = create_tenant_xml("test_tenant_delete", \
+ str(self.auth_token))
+ respG, contentG = create_tenant_group_xml('test_tenant_group_delete', \
+ "test_tenant_delete", str(self.auth_token))
+ respG, contentG = delete_tenant_group_xml('test_tenant_group_delete', \
+ "test_tenant_delete", str(self.auth_token))
+ resp, content = delete_tenant_xml("test_tenant_delete", \
+ str(self.auth_token))
+ self.assertEqual(204, int(respG['status']))
+
+class create_global_group_test(global_group_test):
+
+def test_global_group_create(self):
+
+ respG, contentG = delete_global_group('test_tenant_group', \
+ str(self.auth_token))
+ respG, contentG = create_global_group(str(self.auth_token))
+ self.group = 'test_tenant_group'
+
+ if int(respG['status']) == 500:
+ self.fail('IDM fault')
+ elif int(respG['status']) == 503:
+ self.fail('Service Not Available')
+ if int(respG['status']) not in (200, 201):
+ self.fail('Failed due to %d' % int(respG['status']))
+
+
+def test_global_group_create_again(self):
+
+ respG, contentG = create_global_group('test_tenant_group', \
+ str(self.auth_token))
+ respG, contentG = create_global_group('test_tenant_group', \
+ "test_tenant", str(self.auth_token))
+
+ if int(respG['status']) == 200:
+ self.tenant = content['tenant']['id']
+ self.tenant_group = contentG['group']['id']
+ if int(respG['status']) == 500:
+ self.fail('IDM fault')
+ elif int(respG['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(409, int(respG['status']))
+ if int(respG['status']) == 200:
+ self.tenant = content['tenant']['id']
+ self.tenant_group = contentG['group']['id']
+
+
+def test_tenant_group_create_forbidden_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ respG, contentG = create_tenant_group_xml('test_tenant_group', \
+ "test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ if int(respG['status']) == 200:
+ self.tenant_group = respG['group']['id']
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = {"group": {"id": self.tenant_group,
+ "description": "A description ..."
+ }}
+ resp, content = h.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": self.token})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+
+def test_tenant_group_create_expired_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = {"group": {"id": self.tenant_group,
+ "description": "A description ..."
+ }}
+ resp, content = h.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": self.exp_auth_token})
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+def test_tenant_group_create_missing_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = {"group": {"id": self.tenant_group,
+ "description": "A description ..."}}
+ resp, content = h.request(url, "POST", body=json.dumps(body),
+ headers={"Content-Type": "application/json"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+def test_tenant_group_create_disabled_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = '{"group": { "id": "%s", \
+ "description": "A description ..." } }' % self.tenant_group
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": self.disabled_token})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+def test_tenant_group_create_invalid_token(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = '{"group": { "id": "%s", \
+ "description": "A description ..." } }' % self.tenant
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/json",\
+ "X-Auth-Token": 'nonexsitingtoken'})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+
+
+def test_tenant_group_create_xml(self):
+ resp, content = delete_tenant_xml('test_tenant', str(self.auth_token))
+ resp, content = create_tenant_xml('test_tenant', str(self.auth_token))
+ respG, contentG = delete_tenant_group_xml('test_tenant_group', \
+ "test_tenant", str(self.auth_token))
+ respG, contentG = create_tenant_group_xml('test_tenant_group', \
+ "test_tenant", str(self.auth_token))
+
+ self.tenant = 'test_tenant'
+ self.tenant_group = 'test_tenant_group'
+ content = etree.fromstring(content)
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+
+ if int(respG['status']) not in (200, 201):
+
+ self.fail('Failed due to %d' % int(respG['status']))
+
+def test_tenant_group_create_again_xml(self):
+
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+
+ respG, contentG = create_tenant_group_xml('test_tenant_group', \
+ "test_tenant", str(self.auth_token))
+ respG, contentG = create_tenant_group_xml('test_tenant_group', \
+ "test_tenant", str(self.auth_token))
+
+ content = etree.fromstring(content)
+ contentG = etree.fromstring(contentG)
+ if int(respG['status']) == 200:
+ self.tenant = content.get("id")
+ self.tenant_group = contentG.get("id")
+
+ if int(respG['status']) == 500:
+ self.fail('IDM fault')
+ elif int(respG['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(409, int(respG['status']))
+ if int(respG['status']) == 200:
+ self.tenant = content.get("id")
+ self.tenant_group = contentG.get("id")
+
+def test_tenant_group_create_forbidden_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant("test_tenant", str(self.auth_token))
+ if int(resp['status']) == 200:
+ self.tenant = content['tenant']['id']
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ id="%s"> \
+ <description>A description...</description> \
+ </group>' % self.tenant_group
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml", \
+ "X-Auth-Token": self.token,
+ "ACCEPT": "application/xml"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+
+ self.assertEqual(403, int(resp['status']))
+
+def test_tenant_group_create_expired_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ content = etree.fromstring(content)
+ if int(resp['status']) == 200:
+ self.tenant = content.get('id')
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ id="%s"> \
+ <description>A description...</description> \
+ </group>' % self.tenant
+
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml", \
+ "X-Auth-Token": self.exp_auth_token,
+ "ACCEPT": "application/xml"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+def test_tenant_group_create_missing_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ content = etree.fromstring(content)
+ if int(resp['status']) == 200:
+ self.tenant = content.get('id')
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ id="%s"> \
+ <description>A description...</description> \
+ </group>' % self.tenant_group
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",
+ "ACCEPT": "application/xml"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+def test_tenant_group_create_disabled_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ content = etree.fromstring(content)
+ if int(resp['status']) == 200:
+ self.tenant = content.get('id')
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ id="%s"> \
+ <description>A description...</description> \
+ </group>' % self.tenant_group
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",
+ "X-Auth-Token": self.disabled_token,
+ "ACCEPT": "application/xml"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+def test_tenant_group_create_invalid_token_xml(self):
+ h = httplib2.Http(".cache")
+ resp, content = create_tenant_xml("test_tenant", str(self.auth_token))
+ content = etree.fromstring(content)
+ if int(resp['status']) == 200:
+ self.tenant = content.get('id')
+
+ url = '%stenant/%s/groups' % (URL, self.tenant)
+ body = '<?xml version="1.0" encoding="UTF-8"?> \
+ <group xmlns="http://docs.openstack.org/idm/api/v1.0" \
+ id="%s"> \
+ <description>A description...</description> \
+ </group>' % self.tenant_group
+ resp, content = h.request(url, "POST", body=body,\
+ headers={"Content-Type": "application/xml",\
+ "X-Auth-Token": 'nonexsitingtoken',
+ "ACCEPT": "application/xml"})
+
+ if int(resp['status']) == 500:
+ self.fail('IDM fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+
+if __name__ == '__main__':
+ unittest.main()