summaryrefslogtreecommitdiffstats
path: root/ipaclient/remote_plugins/schema.py
diff options
context:
space:
mode:
authorJan Cholasta <jcholast@redhat.com>2016-06-22 13:27:25 +0200
committerJan Cholasta <jcholast@redhat.com>2016-06-27 16:42:42 +0200
commitf7cc15f0990ef2db57717a3c6a8e9db2c3dee951 (patch)
tree726cbd719163f0b2cd3614342887849704e9b892 /ipaclient/remote_plugins/schema.py
parent61987b66ba8dfcb0c683de64cbda77eb29f9f767 (diff)
downloadfreeipa-f7cc15f0990ef2db57717a3c6a8e9db2c3dee951.tar.gz
freeipa-f7cc15f0990ef2db57717a3c6a8e9db2c3dee951.tar.xz
freeipa-f7cc15f0990ef2db57717a3c6a8e9db2c3dee951.zip
schema: client-side code cleanup
Move client-side code scattered in global functions into neat classes. https://fedorahosted.org/freeipa/ticket/4739 Reviewed-By: David Kupka <dkupka@redhat.com>
Diffstat (limited to 'ipaclient/remote_plugins/schema.py')
-rw-r--r--ipaclient/remote_plugins/schema.py432
1 files changed, 190 insertions, 242 deletions
diff --git a/ipaclient/remote_plugins/schema.py b/ipaclient/remote_plugins/schema.py
index b944fb08f..98500384b 100644
--- a/ipaclient/remote_plugins/schema.py
+++ b/ipaclient/remote_plugins/schema.py
@@ -13,8 +13,8 @@ from ipaclient.plugins.rpcclient import rpcclient
from ipalib import parameters, plugable
from ipalib.frontend import Command, Method, Object
from ipalib.output import Output
-from ipalib.parameters import Bool, DefaultFrom, Flag, Password, Str
-from ipalib.text import ConcatenatedLazyText, _
+from ipalib.parameters import DefaultFrom, Flag, Password, Str
+from ipalib.text import _
from ipapython.dn import DN
from ipapython.dnsutil import DNSName
@@ -48,39 +48,6 @@ _PARAMS = {
class _SchemaCommand(Command):
- def __fix_default_from(self, param):
- api = self.api
- name = unicode(self.name)
- param_name = unicode(param.name)
- keys = param.default_from.keys
-
- if keys:
- def callback(*args):
- kw = dict(zip(keys, args))
- result = api.Command.command_defaults(
- name,
- params=[param_name],
- kw=kw,
- )['result']
- return result.get(param_name)
- else:
- def callback():
- result = api.Command.command_defaults(
- name,
- params=[param_name],
- )['result']
- return result.get(param_name)
-
- callback.__name__ = '{0}_{1}_default'.format(self.name, param.name)
-
- return param.clone(default_from=DefaultFrom(callback, *keys))
-
- def get_args(self):
- for arg in super(_SchemaCommand, self).get_args():
- if arg.default_from is not None:
- arg = self.__fix_default_from(arg)
- yield arg
-
def get_options(self):
skip = set()
for option in super(_SchemaCommand, self).get_options():
@@ -88,12 +55,6 @@ class _SchemaCommand(Command):
continue
if option.name in ('all', 'raw'):
skip.add(option.name)
- if option.default_from is not None:
- option = self.__fix_default_from(option)
- if (isinstance(option, Bool) and
- option.autofill and
- option.default is False):
- option = option.clone_retype(option.name, Flag)
yield option
@@ -182,260 +143,247 @@ class _SchemaMethod(Method, _SchemaCommand):
yield output_param
-def _nope():
+class _SchemaObject(Object):
pass
-def _create_param_convert_scalar(cls):
- def _convert_scalar(self, value, index=None):
- if isinstance(value, unicode):
- return value
- return super(cls, self)._convert_scalar(value)
+class _SchemaPlugin(object):
+ bases = None
+ schema_key = None
- return _convert_scalar
+ def __init__(self, name):
+ self.name = name
+ self.__class = None
+ def _create_default_from(self, api, name, keys):
+ cmd_name = self.name
-def _create_param(meta):
- type_name = str(meta['type'])
- sensitive = meta.get('sensitive', False)
+ def get_default(*args):
+ kw = dict(zip(keys, args))
+ result = api.Command.command_defaults(
+ unicode(cmd_name),
+ params=[unicode(name)],
+ kw=kw,
+ )['result']
+ return result.get(name)
- if type_name == 'str' and sensitive:
- cls = Password
- sensitive = False
- else:
- try:
- cls = _PARAMS[type_name]
- except KeyError:
- cls = Str
-
- kwargs = {}
- default = None
-
- for key, value in meta.items():
- if key in ('alwaysask',
- 'doc',
- 'label',
- 'multivalue',
- 'no_convert',
- 'option_group',
- 'required',
- 'sortorder'):
- kwargs[key] = value
- elif key in ('cli_metavar',
- 'cli_name'):
- kwargs[key] = str(value)
- elif key == 'confirm' and issubclass(cls, parameters.Password):
- kwargs[key] = value
- elif key == 'default':
- default = value
- elif key == 'default_from_param':
- kwargs['default_from'] = DefaultFrom(_nope,
- *(str(k) for k in value))
- elif key in ('exclude',
- 'include'):
- kwargs[key] = tuple(str(v) for v in value)
-
- if default is not None:
- tmp = cls(str(meta['name']), **dict(kwargs, no_convert=False))
- if tmp.multivalue:
- default = tuple(tmp._convert_scalar(d) for d in default)
+ if keys:
+ def callback(*args):
+ return get_default(*args)
else:
- default = tmp._convert_scalar(default[0])
- kwargs['default'] = default
+ def callback():
+ return get_default()
- if 'default' in kwargs or 'default_from' in kwargs:
- kwargs['autofill'] = not kwargs.pop('alwaysask', False)
+ callback.__name__ = '{0}_{1}_default'.format(cmd_name, name)
- param = cls(str(meta['name']), **kwargs)
+ return DefaultFrom(callback, *keys)
- if sensitive:
- object.__setattr__(param, 'password', True)
+ def _create_param(self, api, schema):
+ name = str(schema['name'])
+ type_name = str(schema['type'])
+ sensitive = schema.get('sensitive', False)
- return param
+ if type_name == 'str' and sensitive:
+ cls = Password
+ sensitive = False
+ elif (type_name == 'bool' and
+ 'default' in schema and
+ schema['default'] == [u'False']):
+ cls = Flag
+ del schema['default']
+ else:
+ try:
+ cls = _PARAMS[type_name]
+ except KeyError:
+ cls = Str
+
+ kwargs = {}
+ default = None
+
+ for key, value in schema.items():
+ if key in ('alwaysask',
+ 'doc',
+ 'label',
+ 'multivalue',
+ 'no_convert',
+ 'option_group',
+ 'required'):
+ kwargs[key] = value
+ elif key in ('cli_metavar',
+ 'cli_name'):
+ kwargs[key] = str(value)
+ elif key == 'confirm' and issubclass(cls, Password):
+ kwargs[key] = value
+ elif key == 'default':
+ default = value
+ elif key == 'default_from_param':
+ keys = tuple(str(k) for k in value)
+ kwargs['default_from'] = (
+ self._create_default_from(api, name, keys))
+ elif key in ('exclude',
+ 'include'):
+ kwargs[key] = tuple(str(v) for v in value)
+
+ if default is not None:
+ tmp = cls(name, **dict(kwargs, no_convert=False))
+ if tmp.multivalue:
+ default = tuple(tmp._convert_scalar(d) for d in default)
+ else:
+ default = tmp._convert_scalar(default[0])
+ kwargs['default'] = default
+ if 'default' in kwargs or 'default_from' in kwargs:
+ kwargs['autofill'] = not kwargs.pop('alwaysask', False)
-def _create_output(schema):
- if schema.get('multivalue', False):
- type_type = (tuple, list)
- if not schema.get('required', True):
- type_type = type_type + (type(None),)
- else:
- try:
- type_type = _TYPES[schema['type']]
- except KeyError:
- type_type = None
- else:
- if not schema.get('required', True):
- type_type = (type_type, type(None))
-
- kwargs = {}
- kwargs['type'] = type_type
-
- if 'doc' in schema:
- kwargs['doc'] = schema['doc']
-
- if schema.get('no_display', False):
- kwargs['flags'] = ('no_display',)
-
- return Output(str(schema['name']), **kwargs)
-
-
-def _create_command(schema):
- command = {}
- command['name'] = str(schema['name'])
- if 'doc' in schema:
- command['doc'] = ConcatenatedLazyText(schema['doc'])
- if 'topic_topic' in schema:
- command['topic'] = str(schema['topic_topic'])
- else:
- command['topic'] = None
- if 'obj_class' in schema:
- command['obj_name'] = str(schema['obj_class'])
- if 'attr_name' in schema:
- command['attr_name'] = str(schema['attr_name'])
- if 'exclude' in schema and u'cli' in schema['exclude']:
- command['NO_CLI'] = True
- command['takes_args'] = tuple(
- _create_param(s) for s in schema['params']
- if s.get('positional', s.get('required', True)))
- command['takes_options'] = tuple(
- _create_param(s) for s in schema['params']
- if not s.get('positional', s.get('required', True)))
- command['has_output'] = tuple(
- _create_output(m) for m in schema['output'])
-
- return command
-
-
-def _create_class(schema):
- cls = {}
- cls['name'] = str(schema['name'])
- if 'doc' in schema:
- cls['doc'] = ConcatenatedLazyText(schema['doc'])
- if 'topic_topic' in schema:
- cls['topic'] = str(schema['topic_topic'])
- else:
- cls['topic'] = None
- cls['takes_params'] = tuple(_create_param(s) for s in schema['params'])
-
- return cls
-
-
-class _LazySchemaPlugin(object):
- def __init__(self, base, schema):
- self.__base = base
- self.__schema = schema
- self.__class = None
- self.__module__ = None
+ param = cls(name, **kwargs)
- @property
- def name(self):
- return str(self.__schema['name'])
+ if sensitive:
+ object.__setattr__(param, 'password', True)
- @property
- def bases(self):
- if self.__base is Command:
- if 'obj_class' in self.__schema:
- return (_SchemaMethod,)
- else:
- return (_SchemaCommand,)
+ return param
+
+ def _create_class(self, api, schema):
+ class_dict = {}
+
+ class_dict['name'] = self.name
+ if 'doc' in schema:
+ class_dict['doc'] = schema['doc']
+ if 'topic_topic' in schema:
+ class_dict['topic'] = str(schema['topic_topic'])
else:
- return (self.__base,)
+ class_dict['topic'] = None
+
+ class_dict['takes_params'] = tuple(self._create_param(api, s)
+ for s in schema.get('params', []))
+
+ return self.name, self.bases, class_dict
def __call__(self, api):
if self.__class is None:
- if self.__base is Command:
- metaobject = _create_command(self.__schema)
- else:
- metaobject = _create_class(self.__schema)
- metaobject = type(self.name, self.bases, metaobject)
- metaobject.__module__ = self.__module__
- self.__class = metaobject
+ schema = api._schema[self.schema_key][self.name]
+ name, bases, class_dict = self._create_class(api, schema)
+ self.__class = type(name, bases, class_dict)
return self.__class(api)
-def _create_commands(schema):
- return [_LazySchemaPlugin(Command, s) for s in schema]
+class _SchemaCommandPlugin(_SchemaPlugin):
+ bases = (_SchemaCommand,)
+ schema_key = 'commands'
+
+ def _create_output(self, api, schema):
+ if schema.get('multivalue', False):
+ type_type = (tuple, list)
+ if not schema.get('required', True):
+ type_type = type_type + (type(None),)
+ else:
+ try:
+ type_type = _TYPES[schema['type']]
+ except KeyError:
+ type_type = None
+ else:
+ if not schema.get('required', True):
+ type_type = (type_type, type(None))
+
+ kwargs = {}
+ kwargs['type'] = type_type
+ if 'doc' in schema:
+ kwargs['doc'] = schema['doc']
-def _create_classes(schema):
- return [_LazySchemaPlugin(Object, s) for s in schema]
+ if schema.get('no_display', False):
+ kwargs['flags'] = ('no_display',)
+ return Output(str(schema['name']), **kwargs)
-def _create_topic(schema):
- topic = {}
- topic['name'] = str(schema['name'])
- if 'doc' in schema:
- topic['doc'] = ConcatenatedLazyText(schema['doc'])
- if 'topic_topic' in schema:
- topic['topic'] = str(schema['topic_topic'])
- else:
- topic['topic'] = None
+ def _create_class(self, api, schema):
+ name, bases, class_dict = (
+ super(_SchemaCommandPlugin, self)._create_class(api, schema))
- return topic
+ if 'obj_class' in schema or 'attr_name' in schema:
+ bases = (_SchemaMethod,)
+ if 'obj_class' in schema:
+ class_dict['obj_name'] = str(schema['obj_class'])
+ if 'attr_name' in schema:
+ class_dict['attr_name'] = str(schema['attr_name'])
+ if 'exclude' in schema and u'cli' in schema['exclude']:
+ class_dict['NO_CLI'] = True
-def _create_topics(schema):
- return [_create_topic(s) for s in schema]
+ args = set(str(s['name']) for s in schema['params']
+ if s.get('positional', s.get('required', True)))
+ class_dict['takes_args'] = tuple(
+ p for p in class_dict['takes_params'] if p.name in args)
+ class_dict['takes_options'] = tuple(
+ p for p in class_dict['takes_params'] if p.name not in args)
+ del class_dict['takes_params']
+
+ class_dict['has_output'] = tuple(
+ self._create_output(api, s) for s in schema['output'])
+
+ return name, bases, class_dict
+
+
+class _SchemaObjectPlugin(_SchemaPlugin):
+ bases = (_SchemaObject,)
+ schema_key = 'classes'
def get_package(api):
- package_name = '{}${}'.format(__name__, id(api))
- package_dir = '{}${}'.format(os.path.splitext(__file__)[0], id(api))
+ try:
+ schema = api._schema
+ except AttributeError:
+ client = rpcclient(api)
+ client.finalize()
+
+ client.connect(verbose=False)
+ try:
+ schema = client.forward(u'schema', version=u'2.170')['result']
+ finally:
+ client.disconnect()
+
+ for key in ('commands', 'classes', 'topics'):
+ schema[key] = {str(s.pop('name')): s for s in schema[key]}
+
+ object.__setattr__(api, '_schema', schema)
+
+ fingerprint = str(schema['fingerprint'])
+ package_name = '{}${}'.format(__name__, fingerprint)
+ package_dir = '{}${}'.format(os.path.splitext(__file__)[0], fingerprint)
try:
return sys.modules[package_name]
except KeyError:
pass
- client = rpcclient(api)
- client.finalize()
-
- client.connect(verbose=False)
- try:
- schema = client.forward(u'schema', version=u'2.170')['result']
- finally:
- client.disconnect()
-
- commands = _create_commands(schema['commands'])
- classes = _create_classes(schema['classes'])
- topics = _create_topics(schema['topics'])
-
package = types.ModuleType(package_name)
package.__file__ = os.path.join(package_dir, '__init__.py')
- package.modules = []
+ package.modules = ['plugins']
sys.modules[package_name] = package
- module_name = '.'.join((package_name, 'commands'))
+ module_name = '.'.join((package_name, 'plugins'))
module = types.ModuleType(module_name)
- module.__file__ = os.path.join(package_dir, 'commands.py')
+ module.__file__ = os.path.join(package_dir, 'plugins.py')
module.register = plugable.Registry()
- package.modules.append('commands')
+ for key, plugin_cls in (('commands', _SchemaCommandPlugin),
+ ('classes', _SchemaObjectPlugin)):
+ for name in schema[key]:
+ plugin = plugin_cls(name)
+ plugin = module.register()(plugin)
+ setattr(module, name, plugin)
sys.modules[module_name] = module
- for command in commands:
- command.__module__ = module_name
- command = module.register()(command)
- setattr(module, command.name, command)
-
- for cls in classes:
- cls.__module__ = module_name
- cls = module.register()(cls)
- setattr(module, cls.name, command)
-
- for topic in topics:
- name = topic.pop('name')
+ for name, topic in six.iteritems(schema['topics']):
module_name = '.'.join((package_name, name))
try:
module = sys.modules[module_name]
except KeyError:
module = sys.modules[module_name] = types.ModuleType(module_name)
module.__file__ = os.path.join(package_dir, '{}.py'.format(name))
- module.__dict__.update(topic)
- try:
- module.__doc__ = module.doc
- except AttributeError:
- pass
+ module.__doc__ = topic.get('doc')
+ if 'topic_topic' in topic:
+ module.topic = str(topic['topic_topic'])
+ else:
+ module.topic = None
return package