summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorZiad Sawalha <github@highbridgellc.com>2011-05-25 20:30:26 -0500
committerZiad Sawalha <github@highbridgellc.com>2011-05-25 20:30:26 -0500
commitc7eae23eaaf2041778db4e1e913629babde61a35 (patch)
tree525adade2374502d940c6be15725f5af5e206e08
parentb0d12a558f590a501a42afd1283148961563920e (diff)
parent61c2444f76eae5fb24896c904ec1ed67b6a722aa (diff)
downloadkeystone-c7eae23eaaf2041778db4e1e913629babde61a35.tar.gz
keystone-c7eae23eaaf2041778db4e1e913629babde61a35.tar.xz
keystone-c7eae23eaaf2041778db4e1e913629babde61a35.zip
Merge https://github.com/yogirackspace/keystone
-rwxr-xr-xbin/keystone-manage27
-rwxr-xr-xbin/sampledata.sh3
-rw-r--r--docs/guide/src/docbkx/identitydevguide.xml3
-rw-r--r--docs/guide/src/docbkx/samples/role.json2
-rw-r--r--docs/guide/src/docbkx/samples/role.xml2
-rw-r--r--docs/guide/src/docbkx/samples/roles.json8
-rw-r--r--docs/guide/src/docbkx/samples/roles.xml4
-rw-r--r--keystone/db/sqlalchemy/api.py145
-rw-r--r--keystone/db/sqlalchemy/models.py27
-rw-r--r--keystone/logic/service.py31
-rw-r--r--keystone/logic/types/baseURL.py97
-rw-r--r--keystone/server.py46
-rw-r--r--test/unit/test_BaseURLs.py136
-rw-r--r--test/unit/test_common.py1
-rw-r--r--test/unit/test_roles.py388
15 files changed, 885 insertions, 35 deletions
diff --git a/bin/keystone-manage b/bin/keystone-manage
index 6b17792d..51d94bab 100755
--- a/bin/keystone-manage
+++ b/bin/keystone-manage
@@ -74,7 +74,7 @@ def Main():
parser.error('No object type specified for first argument')
object_type = args[0]
- if object_type in ['user', 'tenant', 'role', 'token']:
+ if object_type in ['user', 'tenant', 'role', 'baseURLs' , 'token']:
pass
else:
parser.error('%s is not a supported object type' % object_type)
@@ -234,6 +234,31 @@ def Main():
except Exception as exc:
print "ERROR: Failed to grant role: %s" % exc
return
+ elif object_type == "baseURLs":
+ if command == "add":
+ if len(args) < 8:
+ parser.error("Missing arguments: baseURLs add 'region' 'service'"\
+ "'publicURL' 'adminURL' 'internalURL' 'enabled'")
+ region = args[2]
+ service = args[3]
+ public_url = args[4]
+ admin_url = args[5]
+ internal_url = args[6]
+ enabled = args[7]
+ try:
+ object = db_models.BaseUrls()
+ object.region = region
+ object.service = service
+ object.public_url = public_url
+ object.admin_url = admin_url
+ object.internal_url = internal_url
+ object.enabled = enabled
+ object = db_api.baseurls_create(object)
+ print "BaseURl created successfully. ID=%s" % object.id
+ return
+ except Exception as exc:
+ print "ERROR: Failed to create BaseURLs: %s" % exc
+ return
elif object_type == "token":
if command == "add":
if len(args) < 6:
diff --git a/bin/sampledata.sh b/bin/sampledata.sh
index 0e58198b..67b7b86b 100755
--- a/bin/sampledata.sh
+++ b/bin/sampledata.sh
@@ -34,7 +34,8 @@
./keystone-manage $* role add Admin
./keystone-manage $* role grant Admin admin 1234
-
+#BaseURLs
+./keystone-manage $* baseURLs add DFW cloudFiles public.cloudfiles.com admin.cloudfiles.com internal.cloudfiles.com 1
# Groups
#./keystone-manage $* group add Admin 1234
#./keystone-manage $* group add Default 1234
diff --git a/docs/guide/src/docbkx/identitydevguide.xml b/docs/guide/src/docbkx/identitydevguide.xml
index 23af253a..8e034307 100644
--- a/docs/guide/src/docbkx/identitydevguide.xml
+++ b/docs/guide/src/docbkx/identitydevguide.xml
@@ -1251,8 +1251,7 @@ Host: identity.api.openstack.org/v1.1/
<tr>
<td colspan="1">&GET;
</td>
- <td colspan="4">/roles?<parameter>serviceName</parameter>=<literal>ServiceName</literal>
- </td>
+ <td colspan="4">/roles </td>
<td colspan="3">
Get a list of roles.
</td>
diff --git a/docs/guide/src/docbkx/samples/role.json b/docs/guide/src/docbkx/samples/role.json
index d52e2c76..904801ee 100644
--- a/docs/guide/src/docbkx/samples/role.json
+++ b/docs/guide/src/docbkx/samples/role.json
@@ -2,6 +2,6 @@
"role" :
{
"id" : "Admin",
- "description" : "cloudFiles"
+ "description" : "All access"
}
} \ No newline at end of file
diff --git a/docs/guide/src/docbkx/samples/role.xml b/docs/guide/src/docbkx/samples/role.xml
index 5a4ecf19..48fe20d8 100644
--- a/docs/guide/src/docbkx/samples/role.xml
+++ b/docs/guide/src/docbkx/samples/role.xml
@@ -1,4 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<role xmlns="http://docs.openstack.org/identity/api/v2.0" id="Admin"
- description="cloudFiles" />
+ description="All Access" />
diff --git a/docs/guide/src/docbkx/samples/roles.json b/docs/guide/src/docbkx/samples/roles.json
index 5d5636ea..4f431806 100644
--- a/docs/guide/src/docbkx/samples/roles.json
+++ b/docs/guide/src/docbkx/samples/roles.json
@@ -1,12 +1,12 @@
{
"roles" : [
{
- "id" : 1,
- "description" : "cloudFiles"
+ "id" : "Admin",
+ "description" : "All access"
},
{
- "id" : 2,
- "description" : "cloudFiles"
+ "id" : "Guest",
+ "description" : "Guest Access"
},
]
}
diff --git a/docs/guide/src/docbkx/samples/roles.xml b/docs/guide/src/docbkx/samples/roles.xml
index 53fbdf73..bf128ff4 100644
--- a/docs/guide/src/docbkx/samples/roles.xml
+++ b/docs/guide/src/docbkx/samples/roles.xml
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<roles xmlns="http://docs.openstack.org/identity/api/v2.0">
- <role id="4" description="cloudFilesCDN" />
- <role id="5" description="cloudServers" />
+ <role id="Admin" description="All Access" />
+ <role id="Guest" description="Guest Access" />
</roles> \ No newline at end of file
diff --git a/keystone/db/sqlalchemy/api.py b/keystone/db/sqlalchemy/api.py
index 3f53ab65..caf872ed 100644
--- a/keystone/db/sqlalchemy/api.py
+++ b/keystone/db/sqlalchemy/api.py
@@ -211,6 +211,7 @@ def tenant_get_page_markers(marker, limit, session=None):
return (prev, next)
+
def tenant_is_empty(id, session=None):
if not session:
session = get_session()
@@ -871,3 +872,147 @@ def groups_get_by_user_get_page_markers(user_id, marker, limit, session=None):
else:
next = next.id
return (prev, next)
+
+def role_get_page_markers(marker, limit, session=None):
+ if not session:
+ session = get_session()
+ first = session.query(models.Role).order_by(\
+ models.Role.id).first()
+ last = session.query(models.Role).order_by(\
+ models.Role.id.desc()).first()
+ if first is None:
+ return (None, None)
+ if marker is None:
+ marker = first.id
+ next = session.query(models.Role).filter("id > :marker").params(\
+ marker='%s' % marker).order_by(\
+ models.Role.id).limit(limit).all()
+ prev = session.query(models.Role).filter("id < :marker").params(\
+ marker='%s' % marker).order_by(\
+ models.Role.id.desc()).limit(int(limit)).all()
+ if len(next) == 0:
+ next = last
+ else:
+ for t in next:
+ next = t
+ if len(prev) == 0:
+ prev = first
+ else:
+ for t in prev:
+ prev = t
+ if prev.id == marker:
+ prev = None
+ else:
+ prev = prev.id
+ if next.id == last.id:
+ next = None
+ else:
+ next = next.id
+ return (prev, next)
+
+def role_ref_get_page_markers(user_id, marker, limit, session=None):
+ if not session:
+ session = get_session()
+ first = session.query(models.UserRoleAssociation).filter_by(user_id=user_id).order_by(\
+ models.UserRoleAssociation.id).first()
+ last = session.query(models.UserRoleAssociation).filter_by(user_id=user_id).order_by(\
+ models.UserRoleAssociation.id.desc()).first()
+ if first is None:
+ return (None, None)
+ if marker is None:
+ marker = first.id
+ next = session.query(models.UserRoleAssociation).filter_by(user_id=user_id).filter("id > :marker").params(\
+ marker='%s' % marker).order_by(\
+ models.UserRoleAssociation.id).limit(limit).all()
+ prev = session.query(models.UserRoleAssociation).filter_by(user_id=user_id).filter("id < :marker").params(\
+ marker='%s' % marker).order_by(\
+ models.UserRoleAssociation.id.desc()).limit(int(limit)).all()
+ if len(next) == 0:
+ next = last
+ else:
+ for t in next:
+ next = t
+ if len(prev) == 0:
+ prev = first
+ else:
+ for t in prev:
+ prev = t
+ if prev.id == marker:
+ prev = None
+ else:
+ prev = prev.id
+ if next.id == last.id:
+ next = None
+ else:
+ next = next.id
+ return (prev, next)
+
+#
+# BaseURL API operations
+#
+
+def baseurls_create(values):
+ baseurls_ref = models.BaseUrls()
+ baseurls_ref.update(values)
+ baseurls_ref.save()
+ return baseurls_ref
+
+def baseurls_get(id, session=None):
+ if not session:
+ session = get_session()
+ result = session.query(models.BaseUrls).filter_by(id=id).first()
+ return result
+
+def baseurls_get_all(session=None):
+ if not session:
+ session = get_session()
+ return session.query(models.BaseUrls).all()
+
+def baseurls_get_page(marker, limit, session=None):
+ if not session:
+ session = get_session()
+
+ if marker:
+ return session.query(models.BaseUrls).filter("id>:marker").params(\
+ marker='%s' % marker).order_by(\
+ models.BaseUrls.id.desc()).limit(limit).all()
+ else:
+ return session.query(models.BaseUrls).order_by(\
+ models.BaseUrls.id.desc()).limit(limit).all()
+
+def baseurls_get_page_markers(marker, limit, session=None):
+ if not session:
+ session = get_session()
+ first = session.query(models.BaseUrls).order_by(\
+ models.BaseUrls.id).first()
+ last = session.query(models.BaseUrls).order_by(\
+ models.BaseUrls.id.desc()).first()
+ if first is None:
+ return (None, None)
+ if marker is None:
+ marker = first.id
+ next = session.query(models.BaseUrls).filter("id > :marker").params(\
+ marker='%s' % marker).order_by(\
+ models.BaseUrls.id).limit(limit).all()
+ prev = session.query(models.BaseUrls).filter("id < :marker").params(\
+ marker='%s' % marker).order_by(\
+ models.BaseUrls.id.desc()).limit(int(limit)).all()
+ if len(next) == 0:
+ next = last
+ else:
+ for t in next:
+ next = t
+ if len(prev) == 0:
+ prev = first
+ else:
+ for t in prev:
+ prev = t
+ if prev.id == marker:
+ prev = None
+ else:
+ prev = prev.id
+ if next.id == last.id:
+ next = None
+ else:
+ next = next.id
+ return (prev, next)
diff --git a/keystone/db/sqlalchemy/models.py b/keystone/db/sqlalchemy/models.py
index 11785b63..11cf1a05 100644
--- a/keystone/db/sqlalchemy/models.py
+++ b/keystone/db/sqlalchemy/models.py
@@ -15,7 +15,7 @@
# limitations under the License.
# Not Yet PEP8 standardized
-from sqlalchemy import create_engine, Column, String, Integer, ForeignKey, UniqueConstraint
+from sqlalchemy import create_engine, Column, String, Integer, ForeignKey, UniqueConstraint, Boolean
from sqlalchemy import DateTime
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.declarative import declarative_base
@@ -88,8 +88,14 @@ class UserRoleAssociation(Base, KeystoneBase):
user_id = Column(String(255), ForeignKey('users.id'))
role_id = Column(String(255), ForeignKey('roles.id'))
tenant_id = Column(String(255), ForeignKey('tenants.id'))
- UniqueConstraint('user_id', 'role_id', 'tenant_id', name='user_role_tenant_uniquness')
+ __table_args__ = (UniqueConstraint("user_id", "role_id", "tenant_id"), {} )
+class TenantBaseURLAssociation(Base, KeystoneBase):
+ __tablename__ = 'tenant_baseURLs'
+ id = Column(Integer, primary_key=True)
+ tenant_id = Column(String(255), ForeignKey('tenants.id'))
+ baseURLs_id = Column(Integer, ForeignKey('urlbase.id'))
+ __table_args__ = (UniqueConstraint("baseURLs_id", "tenant_id"), {} )
# Define objects
class Role(Base, KeystoneBase):
@@ -106,6 +112,7 @@ class Tenant(Base, KeystoneBase):
desc = Column(String(255))
enabled = Column(Integer)
groups = relationship('Group', backref='tenants')
+ endpoints = relationship('TenantBaseURLAssociation', backref='tenant',cascade="all")
class User(Base, KeystoneBase):
@@ -118,7 +125,7 @@ class User(Base, KeystoneBase):
tenant_id = Column(String(255), ForeignKey('tenants.id'))
groups = relationship(UserGroupAssociation, backref='users')
- roles = relationship(UserRoleAssociation,cascade="all,delete")
+ roles = relationship(UserRoleAssociation, cascade="all")
class Credentials(Base, KeystoneBase):
__tablename__ = 'credentials'
@@ -146,9 +153,13 @@ class Token(Base, KeystoneBase):
expires = Column(DateTime)
-class Endpoints(Base, KeystoneBase):
- __tablename__ = 'endpoints'
-
- id = Column(String(255), primary_key=True, unique=True)
+class BaseUrls(Base, KeystoneBase):
+ __tablename__ = 'urlbase'
+
+ id = Column(Integer, primary_key=True)
+ region = Column(String(255))
service = Column(String(255))
- desc = Column(String(255))
+ public_url = Column(String(2000))
+ admin_url = Column(String(2000))
+ internal_url = Column(String(2000))
+ enabled = Column(Boolean) \ No newline at end of file
diff --git a/keystone/logic/service.py b/keystone/logic/service.py
index 4f60cb9a..7e6c07ff 100644
--- a/keystone/logic/service.py
+++ b/keystone/logic/service.py
@@ -25,7 +25,7 @@ import keystone.logic.types.fault as fault
import keystone.logic.types.tenant as tenants
import keystone.logic.types.role as roles
import keystone.logic.types.user as users
-
+import keystone.logic.types.baseURL as baseURLs
class IdentityService(object):
"This is the logical implemenation of the Identity service"
@@ -892,7 +892,7 @@ class IdentityService(object):
for drole in droles:
ts.append(roles.Role(drole.id,
drole.desc))
- prev, next = db_api.tenant_get_page_markers(marker, limit)
+ prev, next = db_api.role_get_page_markers(marker, limit)
links = []
if prev:
links.append(atom.Link('prev', "%s?'marker=%s&limit=%s'" \
@@ -959,7 +959,7 @@ class IdentityService(object):
for droleRef in droleRefs:
ts.append(roles.RoleRef(droleRef.id,droleRef.role_id,
droleRef.tenant_id))
- prev, next = db_api.tenant_get_page_markers(marker, limit)
+ prev, next = db_api.role_ref_get_page_markers(user_id, marker, limit)
links = []
if prev:
links.append(atom.Link('prev', "%s?'marker=%s&limit=%s'" \
@@ -968,4 +968,29 @@ class IdentityService(object):
links.append(atom.Link('next', "%s?'marker=%s&limit=%s'" \
% (url, next, limit)))
return roles.RoleRefs(ts, links)
+
+ def get_baseurls(self, admin_token, marker, limit, url):
+ self.__validate_token(admin_token)
+
+ ts = []
+ dbaseurls = db_api.baseurls_get_page(marker, limit)
+ for dbaseurl in dbaseurls:
+ ts.append(baseURLs.BaseURL(dbaseurl.id, dbaseurl.region, dbaseurl.service, dbaseurl.public_url, dbaseurl.admin_url, dbaseurl.internal_url, dbaseurl.enabled))
+ prev, next = db_api.baseurls_get_page_markers(marker, limit)
+ links = []
+ if prev:
+ links.append(atom.Link('prev', "%s?'marker=%s&limit=%s'" \
+ % (url, prev, limit)))
+ if next:
+ links.append(atom.Link('next', "%s?'marker=%s&limit=%s'" \
+ % (url, next, limit)))
+ return baseURLs.BaseURLs(ts, links)
+
+ def get_baseurl(self, admin_token, baseurl_id):
+ self.__validate_token(admin_token)
+
+ dbaseurl = db_api.baseurls_get(baseurl_id)
+ if not dbaseurl:
+ raise fault.ItemNotFoundFault("The role could not be found")
+ return baseURLs.BaseURL(dbaseurl.id, dbaseurl.region, dbaseurl.service, dbaseurl.public_url, dbaseurl.admin_url, dbaseurl.internal_url, dbaseurl.enabled)
diff --git a/keystone/logic/types/baseURL.py b/keystone/logic/types/baseURL.py
new file mode 100644
index 00000000..2b4997f7
--- /dev/null
+++ b/keystone/logic/types/baseURL.py
@@ -0,0 +1,97 @@
+# Copyright (c) 2010-2011 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+from lxml import etree
+import string
+
+import keystone.logic.types.fault as fault
+class BaseURL(object):
+ def __init__(self, id, region, service, public_url, admin_url, internal_url, enabled):
+ self.id = id
+ self.region = region
+ self.service = service
+ self.public_url = public_url
+ self.admin_url = admin_url
+ self.internal_url = internal_url
+ self.enabled = enabled
+
+ def to_dom(self):
+ dom = etree.Element("baseURL",
+ xmlns="http://docs.openstack.org/identity/api/v2.0")
+ if self.id:
+ dom.set("id", str(self.id))
+ if self.region:
+ dom.set("region", self.region)
+ if self.service:
+ dom.set("serviceName", self.service)
+ if self.public_url:
+ dom.set("publicURL", self.public_url)
+ if self.admin_url:
+ dom.set("adminURL", self.admin_url)
+ if self.internal_url:
+ dom.set("internalURL", self.internal_url)
+ if self.enabled:
+ dom.set("enabled", 'true')
+ return dom
+
+ def to_xml(self):
+ return etree.tostring(self.to_dom())
+
+ def to_dict(self):
+ baseURL = {}
+ if self.id:
+ baseURL["id"] = self.id
+ if self.region:
+ baseURL["region"] = self.region
+ if self.service:
+ baseURL["serviceName"] = self.service
+ if self.public_url:
+ baseURL["publicURL"] = self.public_url
+ if self.admin_url:
+ baseURL["adminURL"] = self.admin_url
+ if self.internal_url:
+ baseURL["internalURL"] = self.internal_url
+ if self.enabled:
+ baseURL["enabled"] = self.enabled
+ return {'baseURL': baseURL}
+
+ def to_json(self):
+ return json.dumps(self.to_dict())
+
+class BaseURLs(object):
+ "A collection of baseURls."
+
+ def __init__(self, values, links):
+ self.values = values
+ self.links = links
+
+ def to_xml(self):
+ dom = etree.Element("baseURLs")
+ dom.set(u"xmlns", "http://docs.openstack.org/identity/api/v2.0")
+
+ for t in self.values:
+ dom.append(t.to_dom())
+
+ for t in self.links:
+ dom.append(t.to_dom())
+
+ return etree.tostring(dom)
+
+ def to_json(self):
+ values = [t.to_dict()["baseURL"] for t in self.values]
+ links = [t.to_dict()["links"] for t in self.links]
+ return json.dumps({"baseURLs": {"values": values, "links": links}})
+
diff --git a/keystone/server.py b/keystone/server.py
index af9580ff..bb8ab145 100644
--- a/keystone/server.py
+++ b/keystone/server.py
@@ -549,7 +549,6 @@ class RolesController(wsgi.Controller):
req.environ['PATH_INFO'])
roles = service.get_roles(utils.get_auth_token(req),
marker, limit, url)
-
return utils.send_result(200, req, roles)
@utils.wrap_error
@@ -582,11 +581,47 @@ class RolesController(wsgi.Controller):
return utils.send_result(200, req, roleRefs)
+ @utils.wrap_error
def delete_role_ref(self, req, user_id, role_ref_id):
rval = service.delete_role_ref(utils.get_auth_token(req),
role_ref_id)
return utils.send_result(204, req, rval)
+class BaseURLsController(wsgi.Controller):
+ """
+ BaseURL Controller -
+ Controller for BaseURL related operations
+ """
+
+ def __init__(self, options):
+ self.options = options
+
+ @utils.wrap_error
+ def get_baseurls(self, req):
+ marker = None
+ if "marker" in req.GET:
+ marker = req.GET["marker"]
+
+ if "limit" in req.GET:
+ limit = req.GET["limit"]
+ else:
+ limit = 10
+
+ url = '%s://%s:%s%s' % (req.environ['wsgi.url_scheme'],
+ req.environ.get("SERVER_NAME"),
+ req.environ.get("SERVER_PORT"),
+ req.environ['PATH_INFO'])
+ baseURLs = service.get_baseurls(utils.get_auth_token(req),
+ marker, limit, url)
+ return utils.send_result(200, req, baseURLs)
+
+ @utils.wrap_error
+ def get_baseurl(self, req, baseURLId):
+ baseurl = service.get_baseurl(utils.get_auth_token(req), baseURLId)
+ return utils.send_result(200, req, baseurl)
+
+
+
class KeystoneAPI(wsgi.Router):
"""WSGI entry point for public Keystone API requests."""
@@ -806,7 +841,7 @@ class KeystoneAdminAPI(wsgi.Router):
action="delete_user_global_group",
conditions=dict(method=["DELETE"]))
- #Roles
+ #Roles and RoleRefs
roles_controller = RolesController(options)
mapper.connect("/v2.0/roles", controller=roles_controller,
action="get_roles", conditions=dict(method=["GET"]))
@@ -821,6 +856,13 @@ class KeystoneAdminAPI(wsgi.Router):
mapper.connect("/v2.0/users/{user_id}/roleRefs/{role_ref_id}",
controller=roles_controller, action="delete_role_ref",
conditions=dict(method=["DELETE"]))
+
+ #BaseURLs and BaseURLRefs
+ baseurls_controller = BaseURLsController(options)
+ mapper.connect("/v2.0/baseURLs", controller=baseurls_controller,
+ action="get_baseurls", conditions=dict(method=["GET"]))
+ mapper.connect("/v2.0/baseURLs/{baseURLId}", controller=baseurls_controller,
+ action="get_baseurl", conditions=dict(method=["GET"]))
# Miscellaneous Operations
diff --git a/test/unit/test_BaseURLs.py b/test/unit/test_BaseURLs.py
new file mode 100644
index 00000000..8d3f7ea7
--- /dev/null
+++ b/test/unit/test_BaseURLs.py
@@ -0,0 +1,136 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+# Copyright (c) 2010-2011 OpenStack, LLC.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import httplib2
+import json
+from lxml import etree
+import os
+import sys
+sys.path.append(os.path.abspath(os.path.join(os.path.abspath(__file__),
+ '..', '..', '..', '..', 'keystone')))
+import unittest
+
+import test_common as utils
+from test_common import URL
+
+class BaseURLsTest(unittest.TestCase):
+ def setUp(self):
+ self.tenant = utils.get_tenant()
+ self.password = utils.get_password()
+ self.email = utils.get_email()
+ self.user = utils.get_user()
+ self.userdisabled = utils.get_userdisabled()
+ self.auth_token = utils.get_auth_token()
+ self.exp_auth_token = utils.get_exp_auth_token()
+ self.disabled_token = utils.get_disabled_token()
+ self.missing_token = utils.get_none_token()
+ self.invalid_token = utils.get_non_existing_token()
+ utils.create_tenant(self.tenant, str(self.auth_token))
+ utils.create_user(self.tenant, self.user, self.auth_token)
+ self.token = utils.get_token(self.user, 'secrete', self.tenant,
+ 'token')
+
+ def tearDown(self):
+ utils.delete_user(self.tenant, self.user, self.auth_token)
+ utils.delete_tenant(self.tenant, self.auth_token)
+
+class GetBaseURLsTest(BaseURLsTest):
+ def test_get_baseURLs(self):
+ header = httplib2.Http(".cache")
+ url = '%sbaseURLs' % (utils.URL)
+ #test for Content-Type = application/json
+ resp, content = header.request(url, "GET", body='{}',
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('Identity Fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+
+ #verify content
+ obj = json.loads(content)
+ if not "baseURLs" in obj:
+ raise self.fail("Expecting BaseURLs")
+
+ def test_get_baseURLs_xml(self):
+ header = httplib2.Http(".cache")
+ url = '%sbaseURLs' % (utils.URL)
+ #test for Content-Type = application/json
+ resp, content = header.request(url, "GET", body='{}',
+ headers={"Content-Type": "application/xml",
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('Identity Fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+
+ #verify content
+ dom = etree.Element("root")
+ dom.append(etree.fromstring(content))
+ baseURLs = dom.find("{http://docs.openstack.org/identity/api/v2.0}" \
+ "baseURLs")
+ if baseURLs == None:
+ self.fail("Expecting BaseURLs")
+
+class GetBaseURLTest(BaseURLsTest):
+ def test_get_baseURL(self):
+ header = httplib2.Http(".cache")
+ url = '%sbaseURLs/%s' % (utils.URL, '1')
+ #test for Content-Type = application/json
+ resp, content = header.request(url, "GET", body='{}',
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('Identity Fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+
+ #verify content
+ obj = json.loads(content)
+ if not "baseURL" in obj:
+ raise self.fail("Expecting BaseURL")
+
+ def test_get_baseURL_xml(self):
+ header = httplib2.Http(".cache")
+ url = '%sbaseURLs/%s' % (utils.URL,'1')
+ #test for Content-Type = application/json
+ resp, content = header.request(url, "GET", body='{}',
+ headers={"Content-Type": "application/xml",
+ "X-Auth-Token": self.auth_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('Identity Fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(200, int(resp['status']))
+
+ #verify content
+ dom = etree.Element("root")
+ dom.append(etree.fromstring(content))
+ baseURL = dom.find("{http://docs.openstack.org/identity/api/v2.0}" \
+ "baseURL")
+ if baseURL == None:
+ self.fail("Expecting BaseURL")
+
+
+
+if __name__ == '__main__':
+ unittest.main() \ No newline at end of file
diff --git a/test/unit/test_common.py b/test/unit/test_common.py
index 126a3f62..beba7dfa 100644
--- a/test/unit/test_common.py
+++ b/test/unit/test_common.py
@@ -725,7 +725,6 @@ def create_role_ref(user_id, role_id, tenant_id, auth_token):
resp, content = header.request(url, "POST", body=json.dumps(body),
headers={"Content-Type": "application/json",
"X-Auth-Token": auth_token})
- print url, resp, content
return (resp, content)
def create_role_ref_xml(user_id, role_id, tenant_id, auth_token):
diff --git a/test/unit/test_roles.py b/test/unit/test_roles.py
index fa736f6a..10b16d27 100644
--- a/test/unit/test_roles.py
+++ b/test/unit/test_roles.py
@@ -164,8 +164,6 @@ class GetRoleTest(RolesTest):
if role_id != 'Admin':
self.fail("Not the expected Role")
-
-
def test_get_role_xml(self):
self.role = 'Admin'
header = httplib2.Http(".cache")
@@ -204,21 +202,136 @@ class GetRoleTest(RolesTest):
elif int(resp['status']) == 503:
self.fail('Service Not Available')
self.assertEqual(404, int(resp['status']))
+
+ def test_get_role_xml_bad(self):
+ header = httplib2.Http(".cache")
+ url = '%sroles/%s' % (utils.URL, 'tenant_bad')
+ #test for Content-Type = application/json
+ resp, content = header.request(url, "GET", body='',
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": self.auth_token})
+ if int(resp['status']) == 500:
+ self.fail('Identity Fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+
+ def test_get_role_expired_token(self):
+ self.role = 'Admin'
+ header = httplib2.Http(".cache")
+ url = '%sroles/%s' % (utils.URL, self.role)
+ #test for Content-Type = application/json
+ resp, content = header.request(url, "GET", body='{}',
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": self.exp_auth_token})
+ if int(resp['status']) == 500:
+ self.fail('Identity Fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
- def test_get_role_bad_xml(self):
+ def test_get_role_xml_using_expired_token(self):
+ self.role = 'Admin'
header = httplib2.Http(".cache")
- resp, content = utils.create_tenant(self.tenant, str(self.auth_token))
- url = '%sroles/%s' % (utils.URL, 'role_bad')
+ url = '%sroles/%s' % (utils.URL, self.role)
#test for Content-Type = application/json
resp, content = header.request(url, "GET", body='',
headers={"Content-Type": "application/xml",
- "X-Auth-Token": self.auth_token,
+ "X-Auth-Token": self.exp_auth_token,
"ACCEPT": "application/xml"})
if int(resp['status']) == 500:
self.fail('Identity Fault')
elif int(resp['status']) == 503:
self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+ def test_get_role_using_disabled_token(self):
+ self.role = 'Admin'
+ header = httplib2.Http(".cache")
+ url = '%sroles/%s' % (utils.URL, self.role)
+ #test for Content-Type = application/json
+ resp, content = header.request(url, "GET", body='{}',
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": self.disabled_token})
+ if int(resp['status']) == 500:
+ self.fail('Identity Fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+ def test_get_role_xml_using_disabled_token(self):
+ self.role = 'Admin'
+ header = httplib2.Http(".cache")
+ url = '%sroles/%s' % (utils.URL, self.role)
+ #test for Content-Type = application/json
+ resp, content = header.request(url, "GET", body='',
+ headers={"Content-Type": "application/xml",
+ "X-Auth-Token": self.disabled_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('Identity Fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+ def test_get_role_using_missing_token(self):
+ self.role = 'Admin'
+ header = httplib2.Http(".cache")
+ url = '%sroles/%s' % (utils.URL, self.role)
+ #test for Content-Type = application/json
+ resp, content = header.request(url, "GET", body='{}',
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": self.missing_token})
+ if int(resp['status']) == 500:
+ self.fail('Identity Fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+ def test_get_role_xml_using_missing_token(self):
+ self.role = 'Admin'
+ header = httplib2.Http(".cache")
+ url = '%sroles/%s' % (utils.URL, self.role)
+ #test for Content-Type = application/json
+ resp, content = header.request(url, "GET", body='',
+ headers={"Content-Type": "application/xml",
+ "X-Auth-Token": self.missing_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('Identity Fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+ def test_get_role_using_invalid_token(self):
+ self.role = 'Admin'
+ header = httplib2.Http(".cache")
+ url = '%sroles/%s' % (utils.URL, self.role)
+ #test for Content-Type = application/json
+ resp, content = header.request(url, "GET", body='{}',
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": self.invalid_token})
+ if int(resp['status']) == 500:
+ self.fail('Identity Fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
self.assertEqual(404, int(resp['status']))
+
+ def test_get_role_xml_using_invalid_token(self):
+ self.role = 'Admin'
+ header = httplib2.Http(".cache")
+ url = '%sroles/%s' % (utils.URL, self.role)
+ #test for Content-Type = application/json
+ resp, content = header.request(url, "GET", body='',
+ headers={"Content-Type": "application/xml",
+ "X-Auth-Token": self.invalid_token,
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('Identity Fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+
class CreateRoleRefTest(RolesTest):
def test_role_ref_create_json(self):
@@ -233,7 +346,36 @@ class CreateRoleRefTest(RolesTest):
resp, content = utils.create_role_ref_xml(self.user, 'Admin', self.tenant,
str(self.auth_token))
resp_val = int(resp['status'])
- self.assertEqual(201, resp_val)
+ self.assertEqual(201, resp_val)
+
+ def test_role_ref_create_json_using_expired_token(self):
+ utils.add_user_json(self.tenant, self.user, self.auth_token)
+ resp, content = utils.create_role_ref(self.user, 'Admin', self.tenant,
+ str(self.exp_auth_token))
+ resp_val = int(resp['status'])
+ self.assertEqual(403, resp_val)
+
+ def test_role_ref_create_json_using_disabled_token(self):
+ utils.add_user_json(self.tenant, self.user, self.auth_token)
+ resp, content = utils.create_role_ref(self.user, 'Admin', self.tenant,
+ str(self.disabled_token))
+ resp_val = int(resp['status'])
+ self.assertEqual(403, resp_val)
+
+ def test_role_ref_create_json_using_missing_token(self):
+ utils.add_user_json(self.tenant, self.user, self.auth_token)
+ resp, content = utils.create_role_ref(self.user, 'Admin', self.tenant,
+ str(self.missing_token))
+ resp_val = int(resp['status'])
+ self.assertEqual(401, resp_val)
+
+ def test_role_ref_create_json_using_invalid_token(self):
+ utils.add_user_json(self.tenant, self.user, self.auth_token)
+ resp, content = utils.create_role_ref(self.user, 'Admin', self.tenant,
+ str(self.invalid_token))
+ resp_val = int(resp['status'])
+ self.assertEqual(404, resp_val)
+
class GetRoleRefsTest(RolesTest):
def test_get_rolerefs(self):
@@ -280,7 +422,140 @@ class GetRoleRefsTest(RolesTest):
"roleRefs")
if roles == None:
self.fail("Expecting Role Refs")
-
+
+ def test_get_rolerefs_using_expired_token(self):
+ header = httplib2.Http(".cache")
+ utils.add_user_json(self.tenant, self.user, self.auth_token)
+ resp, content = utils.create_role_ref(self.user, 'Admin', self.tenant,
+ str(self.auth_token))
+ url = '%susers/%s/roleRefs' % (URL, self.user)
+ #test for Content-Type = application/json
+ resp, content = header.request(url, "GET", body='{}',
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": str(self.exp_auth_token)})
+ if int(resp['status']) == 500:
+ self.fail('Identity Fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+ def test_get_rolerefs_xml_using_expired_token(self):
+ header = httplib2.Http(".cache")
+ utils.add_user_json(self.tenant, self.user, self.auth_token)
+ resp, content = utils.create_role_ref(self.user, 'Admin', self.tenant,
+ str(self.auth_token))
+ url = '%susers/%s/roleRefs' % (URL, self.user)
+ #test for Content-Type = application/xml
+ resp, content = header.request(url, "GET", body='{}',
+ headers={"Content-Type": "application/xml",
+ "X-Auth-Token": str(self.exp_auth_token),
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('Identity Fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+ def test_get_rolerefs_using_disabled_token(self):
+ header = httplib2.Http(".cache")
+ utils.add_user_json(self.tenant, self.user, self.auth_token)
+ resp, content = utils.create_role_ref(self.user, 'Admin', self.tenant,
+ str(self.auth_token))
+ url = '%susers/%s/roleRefs' % (URL, self.user)
+ #test for Content-Type = application/json
+ resp, content = header.request(url, "GET", body='{}',
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": str(self.disabled_token)})
+ if int(resp['status']) == 500:
+ self.fail('Identity Fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+ def test_get_rolerefs_xml_using_disabled_token(self):
+ header = httplib2.Http(".cache")
+ utils.add_user_json(self.tenant, self.user, self.auth_token)
+ resp, content = utils.create_role_ref(self.user, 'Admin', self.tenant,
+ str(self.auth_token))
+ url = '%susers/%s/roleRefs' % (URL, self.user)
+ #test for Content-Type = application/xml
+ resp, content = header.request(url, "GET", body='{}',
+ headers={"Content-Type": "application/xml",
+ "X-Auth-Token": str(self.disabled_token),
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('Identity Fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(403, int(resp['status']))
+
+ def test_get_rolerefs_using_missing_token(self):
+ header = httplib2.Http(".cache")
+ utils.add_user_json(self.tenant, self.user, self.auth_token)
+ resp, content = utils.create_role_ref(self.user, 'Admin', self.tenant,
+ str(self.auth_token))
+ url = '%susers/%s/roleRefs' % (URL, self.user)
+ #test for Content-Type = application/json
+ resp, content = header.request(url, "GET", body='{}',
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": str(self.missing_token)})
+ if int(resp['status']) == 500:
+ self.fail('Identity Fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+ def test_get_rolerefs_xml_using_missing_token(self):
+ header = httplib2.Http(".cache")
+ utils.add_user_json(self.tenant, self.user, self.auth_token)
+ resp, content = utils.create_role_ref(self.user, 'Admin', self.tenant,
+ str(self.auth_token))
+ url = '%susers/%s/roleRefs' % (URL, self.user)
+ #test for Content-Type = application/xml
+ resp, content = header.request(url, "GET", body='{}',
+ headers={"Content-Type": "application/xml",
+ "X-Auth-Token": str(self.missing_token),
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('Identity Fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(401, int(resp['status']))
+
+ def test_get_rolerefs_using_invalid_token(self):
+ header = httplib2.Http(".cache")
+ utils.add_user_json(self.tenant, self.user, self.auth_token)
+ resp, content = utils.create_role_ref(self.user, 'Admin', self.tenant,
+ str(self.auth_token))
+ url = '%susers/%s/roleRefs' % (URL, self.user)
+ #test for Content-Type = application/json
+ resp, content = header.request(url, "GET", body='{}',
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": str(self.invalid_token)})
+ if int(resp['status']) == 500:
+ self.fail('Identity Fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+
+ def test_get_rolerefs_xml_using_missing_token(self):
+ header = httplib2.Http(".cache")
+ utils.add_user_json(self.tenant, self.user, self.auth_token)
+ resp, content = utils.create_role_ref(self.user, 'Admin', self.tenant,
+ str(self.auth_token))
+ url = '%susers/%s/roleRefs' % (URL, self.user)
+ #test for Content-Type = application/xml
+ resp, content = header.request(url, "GET", body='{}',
+ headers={"Content-Type": "application/xml",
+ "X-Auth-Token": str(self.invalid_token),
+ "ACCEPT": "application/xml"})
+ if int(resp['status']) == 500:
+ self.fail('Identity Fault')
+ elif int(resp['status']) == 503:
+ self.fail('Service Not Available')
+ self.assertEqual(404, int(resp['status']))
+
+
class DeleteRoleRefTest(RolesTest):
def test_delete_roleref(self):
header = httplib2.Http(".cache")
@@ -305,7 +580,102 @@ class DeleteRoleRefTest(RolesTest):
"X-Auth-Token": str(self.auth_token)})
resp_val = int(resp['status'])
self.assertEqual(204, resp_val)
- return (resp, content)
+
+ def test_delete_roleref_using_expired_token(self):
+ header = httplib2.Http(".cache")
+ utils.add_user_json(self.tenant, self.user, self.auth_token)
+ resp, content = utils.create_role_ref(self.user, 'Admin', self.tenant,
+ str(self.auth_token))
+ resp_val = int(resp['status'])
+ self.assertEqual(201, resp_val)
+ obj = json.loads(content)
+ if not "roleRef" in obj:
+ raise fault.BadRequestFault("Expecting RoleRef")
+ roleRef = obj["roleRef"]
+ if not "id" in roleRef:
+ role_ref_id = None
+ else:
+ role_ref_id = roleRef["id"]
+ if role_ref_id is None:
+ raise fault.BadRequestFault("Expecting RoleRefId")
+ url = '%susers/%s/roleRefs/%s' % (URL, self.user, role_ref_id)
+ resp, content = header.request(url, "DELETE", body='',
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": str(self.exp_auth_token)})
+ resp_val = int(resp['status'])
+ self.assertEqual(403, resp_val)
+
+ def test_delete_roleref_using_disabled_token(self):
+ header = httplib2.Http(".cache")
+ utils.add_user_json(self.tenant, self.user, self.auth_token)
+ resp, content = utils.create_role_ref(self.user, 'Admin', self.tenant,
+ str(self.auth_token))
+ resp_val = int(resp['status'])
+ self.assertEqual(201, resp_val)
+ obj = json.loads(content)
+ if not "roleRef" in obj:
+ raise fault.BadRequestFault("Expecting RoleRef")
+ roleRef = obj["roleRef"]
+ if not "id" in roleRef:
+ role_ref_id = None
+ else:
+ role_ref_id = roleRef["id"]
+ if role_ref_id is None:
+ raise fault.BadRequestFault("Expecting RoleRefId")
+ url = '%susers/%s/roleRefs/%s' % (URL, self.user, role_ref_id)
+ resp, content = header.request(url, "DELETE", body='',
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": str(self.disabled_token)})
+ resp_val = int(resp['status'])
+ self.assertEqual(403, resp_val)
+
+ def test_delete_roleref_using_missing_token(self):
+ header = httplib2.Http(".cache")
+ utils.add_user_json(self.tenant, self.user, self.auth_token)
+ resp, content = utils.create_role_ref(self.user, 'Admin', self.tenant,
+ str(self.auth_token))
+ resp_val = int(resp['status'])
+ self.assertEqual(201, resp_val)
+ obj = json.loads(content)
+ if not "roleRef" in obj:
+ raise fault.BadRequestFault("Expecting RoleRef")
+ roleRef = obj["roleRef"]
+ if not "id" in roleRef:
+ role_ref_id = None
+ else:
+ role_ref_id = roleRef["id"]
+ if role_ref_id is None:
+ raise fault.BadRequestFault("Expecting RoleRefId")
+ url = '%susers/%s/roleRefs/%s' % (URL, self.user, role_ref_id)
+ resp, content = header.request(url, "DELETE", body='',
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": str(self.missing_token)})
+ resp_val = int(resp['status'])
+ self.assertEqual(401, resp_val)
+
+ def test_delete_roleref_using_invalid_token(self):
+ header = httplib2.Http(".cache")
+ utils.add_user_json(self.tenant, self.user, self.auth_token)
+ resp, content = utils.create_role_ref(self.user, 'Admin', self.tenant,
+ str(self.auth_token))
+ resp_val = int(resp['status'])
+ self.assertEqual(201, resp_val)
+ obj = json.loads(content)
+ if not "roleRef" in obj:
+ raise fault.BadRequestFault("Expecting RoleRef")
+ roleRef = obj["roleRef"]
+ if not "id" in roleRef:
+ role_ref_id = None
+ else:
+ role_ref_id = roleRef["id"]
+ if role_ref_id is None:
+ raise fault.BadRequestFault("Expecting RoleRefId")
+ url = '%susers/%s/roleRefs/%s' % (URL, self.user, role_ref_id)
+ resp, content = header.request(url, "DELETE", body='',
+ headers={"Content-Type": "application/json",
+ "X-Auth-Token": str(self.invalid_token)})
+ resp_val = int(resp['status'])
+ self.assertEqual(404, resp_val)
if __name__ == '__main__':