summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortermie <github@anarkystic.com>2012-01-24 23:01:51 -0800
committertermie <github@anarkystic.com>2012-01-24 23:01:51 -0800
commitd4f2bf5fdefca433ee81075dd28be6f1b7387b50 (patch)
tree38da349f91cf0f4fd02ab4c6030f364cb17455df
parent608b9a270bf2bdc017e02d2575669ec62d0645e7 (diff)
downloadkeystone-d4f2bf5fdefca433ee81075dd28be6f1b7387b50.tar.gz
keystone-d4f2bf5fdefca433ee81075dd28be6f1b7387b50.tar.xz
keystone-d4f2bf5fdefca433ee81075dd28be6f1b7387b50.zip
add a bunch of basic tests for the cli
-rw-r--r--keystone/cli.py146
-rw-r--r--keystone/common/wsgi.py1
-rw-r--r--keystone/test.py1
-rw-r--r--tests/test_cli.py61
-rw-r--r--tests/test_keystoneclient.py80
5 files changed, 249 insertions, 40 deletions
diff --git a/keystone/cli.py b/keystone/cli.py
index 814ef513..0469ab3a 100644
--- a/keystone/cli.py
+++ b/keystone/cli.py
@@ -1,8 +1,10 @@
from __future__ import absolute_import
+import json
import logging
import os
import sys
+import StringIO
import textwrap
import cli.app
@@ -19,7 +21,7 @@ config.register_cli_str('endpoint',
default='http://localhost:$admin_port/v2.0',
#group='ks',
conf=CONF)
-config.register_cli_str('auth_token',
+config.register_cli_str('auth-token',
default='$admin_token',
#group='ks',
help='asdasd',
@@ -113,7 +115,29 @@ class ClientCommand(BaseApp):
if CONF.id_only and getattr(resp, 'id'):
print resp.id
return
- print resp
+
+ if resp is None:
+ return
+
+ # NOTE(termie): this is ugly but it is mostly because the keystoneclient
+ # code doesn't give us very serializable instance objects
+ if type(resp) in [type(list()), type(tuple())]:
+ o = []
+ for r in resp:
+ d = {}
+ for k, v in sorted(r.__dict__.iteritems()):
+ if k[0] == '_' or k == 'manager':
+ continue
+ d[k] = v
+ o.append(d)
+ else:
+ o = {}
+ for k, v in sorted(resp.__dict__.iteritems()):
+ if k[0] == '_' or k == 'manager':
+ continue
+ o[k] = v
+
+ print json.dumps(o)
def print_help(self):
CONF.set_usage(CONF.usage.replace(
@@ -175,6 +199,122 @@ CMDS = {'db_sync': DbSync,
}
+class CommandLineGenerator(object):
+ """A keystoneclient lookalike to generate keystone-manage commands.
+
+ One would use it like so:
+
+ >>> gen = CommandLineGenerator(id_only=None)
+ >>> cl = gen.ec2.create(user_id='foo', tenant_id='foo')
+ >>> cl.to_argv()
+ ... ['keystone-manage',
+ '--id-only',
+ 'ec2',
+ 'create',
+ 'user_id=foo',
+ 'tenant_id=foo']
+
+ """
+
+ cmd = 'keystone-manage'
+
+ def __init__(self, cmd=None, execute=False, **kw):
+ if cmd:
+ self.cmd = cmd
+ self.flags = kw
+ self.execute = execute
+
+ def __getattr__(self, key):
+ return _Manager(self, key)
+
+
+class _Manager(object):
+ def __init__(self, parent, name):
+ self.parent = parent
+ self.name = name
+
+ def __getattr__(self, key):
+ return _CommandLine(cmd=self.parent.cmd,
+ flags=self.parent.flags,
+ manager=self.name,
+ method=key,
+ execute=self.parent.execute)
+
+
+class _CommandLine(object):
+ def __init__(self, cmd, flags, manager, method, execute=False):
+ self.cmd = cmd
+ self.flags = flags
+ self.manager = manager
+ self.method = method
+ self.execute = execute
+ self.kw = {}
+
+ def __call__(self, **kw):
+ self.kw = kw
+ if self.execute:
+ logging.debug('generated cli: %s', str(self))
+ out = StringIO.StringIO()
+ old_out = sys.stdout
+ sys.stdout = out
+ try:
+ main(self.to_argv())
+ except SystemExit as e:
+ pass
+ finally:
+ sys.stdout = old_out
+ rv = out.getvalue().strip().split('\n')[-1]
+ try:
+ loaded = json.loads(rv)
+ if type(loaded) in [type(list()), type(tuple())]:
+ return [DictWrapper(**x) for x in loaded]
+ elif type(loaded) is type(dict()):
+ return DictWrapper(**loaded)
+ except Exception:
+ logging.exception('Could not parse JSON: %s', rv)
+ return rv
+ return self
+
+ def __flags(self):
+ o = []
+ for k, v in self.flags.iteritems():
+ k = k.replace('_', '-')
+ if v is None:
+ o.append('--%s' % k)
+ else:
+ o.append('--%s=%s' % (k, str(v)))
+ return o
+
+ def __manager(self):
+ if self.manager.endswith('s'):
+ return self.manager[:-1]
+ return self.manager
+
+ def __kw(self):
+ o = []
+ for k, v in self.kw.iteritems():
+ o.append('%s=%s' % (k, str(v)))
+ return o
+
+ def to_argv(self):
+ return ([self.cmd]
+ + self.__flags()
+ + [self.__manager(), self.method]
+ + self.__kw())
+
+ def __str__(self):
+ args = self.to_argv()
+ return ' '.join(args[:1] + ['"%s"' % x for x in args[1:]])
+
+
+class DictWrapper(dict):
+ def __getattr__(self, key):
+ try:
+ return self[key]
+ except KeyError:
+ raise AttributeError(key)
+
+
def print_commands(cmds):
print
print "Available commands:"
@@ -195,7 +335,9 @@ def run(cmd, args):
def main(argv=None, config_files=None):
+ CONF.reset()
args = CONF(config_files=config_files, args=argv)
+
if len(args) < 2:
CONF.print_help()
print_commands(CMDS)
diff --git a/keystone/common/wsgi.py b/keystone/common/wsgi.py
index fafa9694..34caec2c 100644
--- a/keystone/common/wsgi.py
+++ b/keystone/common/wsgi.py
@@ -196,7 +196,6 @@ class Application(BaseApplication):
creds = user_token_ref['metadata'].copy()
creds['user_id'] = user_token_ref['user'].get('id')
creds['tenant_id'] = user_token_ref['tenant'].get('id')
- print creds
# Accept either is_admin or the admin role
assert self.policy_api.can_haz(context,
('is_admin:1', 'roles:admin'),
diff --git a/keystone/test.py b/keystone/test.py
index 9e1137a7..42655142 100644
--- a/keystone/test.py
+++ b/keystone/test.py
@@ -146,7 +146,6 @@ class TestCase(unittest.TestCase):
for tenant_id in tenants:
self.identity_api.add_user_to_tenant(tenant_id, user['id'])
setattr(self, 'user_%s' % user['id'], user_copy)
- print user_copy
for role in fixtures.ROLES:
rv = self.identity_api.create_role(role['id'], role)
diff --git a/tests/test_cli.py b/tests/test_cli.py
new file mode 100644
index 00000000..82fde86d
--- /dev/null
+++ b/tests/test_cli.py
@@ -0,0 +1,61 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+import nose.exc
+
+from keystone import config
+from keystone import test
+from keystone.common import utils
+
+import default_fixtures
+import test_keystoneclient
+
+CONF = config.CONF
+KEYSTONECLIENT_REPO = 'git://github.com/openstack/python-keystoneclient.git'
+
+
+class CliMasterTestCase(test_keystoneclient.KcMasterTestCase):
+ def setUp(self):
+ super(CliMasterTestCase, self).setUp()
+ from keystone import cli
+ self.cli = cli
+
+ def get_client(self, user_ref=None, tenant_ref=None):
+ if user_ref is None:
+ user_ref = self.user_foo
+ if tenant_ref is None:
+ for user in default_fixtures.USERS:
+ if user['id'] == user_ref['id']:
+ tenant_id = user['tenants'][0]
+ else:
+ tenant_id = tenant_ref['id']
+
+ cl = self._client(username=user_ref['name'],
+ password=user_ref['password'],
+ tenant_id=tenant_id)
+ gen = self.cli.CommandLineGenerator(
+ cmd=test.rootdir('bin', 'keystone-manage'),
+ execute=True,
+ auth_token=cl.auth_token,
+ endpoint=cl.management_url)
+ gen.auth_token = cl.auth_token
+ gen.management_url = cl.management_url
+ return gen
+
+ def test_authenticate_tenant_id_and_tenants(self):
+ raise nose.exc.SkipTest('N/A')
+
+ def test_authenticate_token_no_tenant(self):
+ raise nose.exc.SkipTest('N/A')
+
+ def test_authenticate_token_tenant_id(self):
+ raise nose.exc.SkipTest('N/A')
+
+ def test_authenticate_token_tenant_name(self):
+ raise nose.exc.SkipTest('N/A')
+
+ def test_tenant_create_update_and_delete(self):
+ raise nose.exc.SkipTest('cli does not support booleans yet')
+ def test_invalid_password(self):
+ raise nose.exc.SkipTest('N/A')
+
+ def test_user_create_update_delete(self):
+ raise nose.exc.SkipTest('cli does not support booleans yet')
diff --git a/tests/test_keystoneclient.py b/tests/test_keystoneclient.py
index d38313d3..6265c175 100644
--- a/tests/test_keystoneclient.py
+++ b/tests/test_keystoneclient.py
@@ -113,7 +113,7 @@ class KcMasterTestCase(CompatTestCase):
def test_endpoints(self):
client = self.get_client()
token = client.auth_token
- endpoints = client.tokens.endpoints(token)
+ endpoints = client.tokens.endpoints(token=token)
# FIXME(ja): this test should require the "keystone:admin" roled
# (probably the role set via --keystone_admin_role flag)
@@ -125,16 +125,16 @@ class KcMasterTestCase(CompatTestCase):
test_tenant = 'new_tenant'
client = self.get_client()
- tenant = client.tenants.create(test_tenant,
+ tenant = client.tenants.create(tenant_name=test_tenant,
description="My new tenant!",
enabled=True)
self.assertEquals(tenant.name, test_tenant)
- tenant = client.tenants.get(tenant.id)
+ tenant = client.tenants.get(tenant_id=tenant.id)
self.assertEquals(tenant.name, test_tenant)
# TODO(devcamcar): update gives 404. why?
- tenant = client.tenants.update(tenant.id,
+ tenant = client.tenants.update(tenant_id=tenant.id,
tenant_name='new_tenant2',
enabled=False,
description='new description')
@@ -142,7 +142,7 @@ class KcMasterTestCase(CompatTestCase):
self.assertFalse(tenant.enabled)
self.assertEquals(tenant.description, 'new description')
- client.tenants.delete(tenant.id)
+ client.tenants.delete(tenant=tenant.id)
self.assertRaises(client_exceptions.NotFound, client.tenants.get,
tenant.id)
@@ -153,25 +153,26 @@ class KcMasterTestCase(CompatTestCase):
def test_tenant_add_and_remove_user(self):
client = self.get_client()
- client.roles.add_user_to_tenant(self.tenant_baz['id'],
- self.user_foo['id'],
- self.role_useless['id'])
+ client.roles.add_user_to_tenant(tenant_id=self.tenant_baz['id'],
+ user_id=self.user_foo['id'],
+ role_id=self.role_useless['id'])
tenant_refs = client.tenants.list()
self.assert_(self.tenant_baz['id'] in
[x.id for x in tenant_refs])
# get the "role_refs" so we get the proper id, this is how the clients
# do it
- roleref_refs = client.roles.get_user_role_refs(self.user_foo['id'])
+ roleref_refs = client.roles.get_user_role_refs(
+ user_id=self.user_foo['id'])
for roleref_ref in roleref_refs:
if (roleref_ref.roleId == self.role_useless['id'] and
roleref_ref.tenantId == self.tenant_baz['id']):
# use python's scope fall through to leave roleref_ref set
break
- client.roles.remove_user_from_tenant(self.tenant_baz['id'],
- self.user_foo['id'],
- roleref_ref.id)
+ client.roles.remove_user_from_tenant(tenant_id=self.tenant_baz['id'],
+ user_id=self.user_foo['id'],
+ role_id=roleref_ref.id)
tenant_refs = client.tenants.list()
self.assert_(self.tenant_baz['id'] not in
@@ -194,17 +195,19 @@ class KcMasterTestCase(CompatTestCase):
test_username = 'new_user'
client = self.get_client()
- user = client.users.create(test_username, 'password', 'user1@test.com')
+ user = client.users.create(name=test_username,
+ password='password',
+ email='user1@test.com')
self.assertEquals(user.name, test_username)
- user = client.users.get(user.id)
+ user = client.users.get(user=user.id)
self.assertEquals(user.name, test_username)
- user = client.users.update_email(user, 'user2@test.com')
+ user = client.users.update_email(user=user, email='user2@test.com')
self.assertEquals(user.email, 'user2@test.com')
# NOTE(termie): update_enabled doesn't return anything, probably a bug
- client.users.update_enabled(user, False)
+ client.users.update_enabled(user=user, enabled=False)
user = client.users.get(user.id)
self.assertFalse(user.enabled)
@@ -214,12 +217,12 @@ class KcMasterTestCase(CompatTestCase):
password='password')
client.users.update_enabled(user, True)
- user = client.users.update_password(user, 'password2')
+ user = client.users.update_password(user=user, password='password2')
test_client = self._client(username=test_username,
password='password2')
- user = client.users.update_tenant(user, 'bar')
+ user = client.users.update_tenant(user=user, tenant='bar')
# TODO(ja): once keystonelight supports default tenant
# when you login without specifying tenant, the
# token should be scoped to tenant 'bar'
@@ -237,12 +240,12 @@ class KcMasterTestCase(CompatTestCase):
def test_user_get(self):
client = self.get_client()
- user = client.users.get(self.user_foo['id'])
+ user = client.users.get(user=self.user_foo['id'])
self.assertRaises(AttributeError, lambda: user.password)
def test_role_get(self):
client = self.get_client()
- role = client.roles.get('keystone_admin')
+ role = client.roles.get(role='keystone_admin')
self.assertEquals(role.id, 'keystone_admin')
def test_role_create_and_delete(self):
@@ -250,16 +253,16 @@ class KcMasterTestCase(CompatTestCase):
test_role = 'new_role'
client = self.get_client()
- role = client.roles.create(test_role)
+ role = client.roles.create(name=test_role)
self.assertEquals(role.name, test_role)
- role = client.roles.get(role)
+ role = client.roles.get(role=role.id)
self.assertEquals(role.name, test_role)
- client.roles.delete(role)
+ client.roles.delete(role=role.id)
self.assertRaises(client_exceptions.NotFound, client.roles.get,
- test_role)
+ role=test_role)
def test_role_list(self):
client = self.get_client()
@@ -269,25 +272,26 @@ class KcMasterTestCase(CompatTestCase):
def test_roles_get_by_user(self):
client = self.get_client()
- roles = client.roles.get_user_role_refs('foo')
+ roles = client.roles.get_user_role_refs(user_id='foo')
self.assertTrue(len(roles) > 0)
def test_ec2_credential_crud(self):
client = self.get_client()
- creds = client.ec2.list(self.user_foo['id'])
+ creds = client.ec2.list(user_id=self.user_foo['id'])
self.assertEquals(creds, [])
- cred = client.ec2.create(self.user_foo['id'], self.tenant_bar['id'])
- creds = client.ec2.list(self.user_foo['id'])
+ cred = client.ec2.create(user_id=self.user_foo['id'],
+ tenant_id=self.tenant_bar['id'])
+ creds = client.ec2.list(user_id=self.user_foo['id'])
self.assertEquals(creds, [cred])
- got = client.ec2.get(self.user_foo['id'], cred.access)
+ got = client.ec2.get(user_id=self.user_foo['id'], access=cred.access)
self.assertEquals(cred, got)
# FIXME(ja): need to test ec2 validation here
- client.ec2.delete(self.user_foo['id'], cred.access)
- creds = client.ec2.list(self.user_foo['id'])
+ client.ec2.delete(user_id=self.user_foo['id'], access=cred.access)
+ creds = client.ec2.list(user_id=self.user_foo['id'])
self.assertEquals(creds, [])
def test_ec2_credentials_list_unauthorized_user(self):
@@ -329,20 +333,24 @@ class KcMasterTestCase(CompatTestCase):
test_service = 'new_service'
client = self.get_client()
- service = client.services.create(test_service, 'test', 'test')
+ service = client.services.create(name=test_service,
+ service_type='test',
+ description='test')
self.assertEquals(service.name, test_service)
- service = client.services.get(service.id)
+ service = client.services.get(id=service.id)
self.assertEquals(service.name, test_service)
- client.services.delete(service.id)
+ client.services.delete(id=service.id)
self.assertRaises(client_exceptions.NotFound, client.services.get,
- service.id)
+ id=service.id)
def test_service_list(self):
client = self.get_client()
test_service = 'new_service'
- service = client.services.create(test_service, 'test', 'test')
+ service = client.services.create(name=test_service,
+ service_type='test',
+ description='test')
services = client.services.list()
# TODO(devcamcar): This assert should be more specific.
self.assertTrue(len(services) > 0)