diff options
-rw-r--r-- | API.txt | 8 | ||||
-rw-r--r-- | VERSION | 2 | ||||
-rwxr-xr-x | ipa-client/ipa-install/ipa-client-install | 29 | ||||
-rw-r--r-- | ipalib/parameters.py | 2 | ||||
-rw-r--r-- | ipalib/plugins/host.py | 33 | ||||
-rw-r--r-- | ipalib/plugins/user.py | 26 | ||||
-rw-r--r-- | ipalib/util.py | 56 | ||||
-rw-r--r-- | ipapython/ipautil.py | 20 | ||||
-rw-r--r-- | ipapython/ssh.py | 199 | ||||
-rw-r--r-- | tests/test_ipapython/test_ssh.py | 76 | ||||
-rw-r--r-- | tests/test_xmlrpc/test_host_plugin.py | 42 | ||||
-rw-r--r-- | tests/test_xmlrpc/test_user_plugin.py | 61 |
12 files changed, 464 insertions, 90 deletions
@@ -1669,7 +1669,7 @@ option: Str('userpassword', attribute=True, cli_name='password', multivalue=Fals option: Flag('random', attribute=False, autofill=True, cli_name='random', default=False, multivalue=False, required=False) option: Bytes('usercertificate', attribute=True, cli_name='certificate', multivalue=False, required=False) option: Str('macaddress', attribute=True, cli_name='macaddress', csv=True, multivalue=True, pattern='^([a-fA-F0-9]{2}[:|\\-]?){5}[a-fA-F0-9]{2}$', required=False) -option: Bytes('ipasshpubkey', attribute=True, cli_name='sshpubkey', multivalue=True, required=False) +option: Str('ipasshpubkey', attribute=True, cli_name='sshpubkey', csv=True, multivalue=True, required=False) option: Str('setattr*', cli_name='setattr', exclude='webui') option: Str('addattr*', cli_name='addattr', exclude='webui') option: Flag('force', autofill=True, default=False) @@ -1754,7 +1754,7 @@ option: Str('userpassword', attribute=True, autofill=False, cli_name='password', option: Flag('random', attribute=False, autofill=True, cli_name='random', default=False, multivalue=False, required=False) option: Bytes('usercertificate', attribute=True, autofill=False, cli_name='certificate', multivalue=False, required=False) option: Str('macaddress', attribute=True, autofill=False, cli_name='macaddress', csv=True, multivalue=True, pattern='^([a-fA-F0-9]{2}[:|\\-]?){5}[a-fA-F0-9]{2}$', required=False) -option: Bytes('ipasshpubkey', attribute=True, autofill=False, cli_name='sshpubkey', multivalue=True, required=False) +option: Str('ipasshpubkey', attribute=True, autofill=False, cli_name='sshpubkey', csv=True, multivalue=True, required=False) option: Str('setattr*', cli_name='setattr', exclude='webui') option: Str('addattr*', cli_name='addattr', exclude='webui') option: Str('delattr*', cli_name='delattr', exclude='webui') @@ -3291,7 +3291,7 @@ option: Str('title', attribute=True, cli_name='title', multivalue=False, require option: Str('manager', attribute=True, cli_name='manager', multivalue=False, required=False) option: Str('carlicense', attribute=True, cli_name='carlicense', multivalue=False, required=False) option: Bool('nsaccountlock', attribute=True, cli_name='nsaccountlock', multivalue=False, required=False) -option: Bytes('ipasshpubkey', attribute=True, cli_name='sshpubkey', multivalue=True, required=False) +option: Str('ipasshpubkey', attribute=True, cli_name='sshpubkey', csv=True, multivalue=True, required=False) option: Str('setattr*', cli_name='setattr', exclude='webui') option: Str('addattr*', cli_name='addattr', exclude='webui') option: Flag('noprivate', autofill=True, cli_name='noprivate', default=False) @@ -3400,7 +3400,7 @@ option: Str('title', attribute=True, autofill=False, cli_name='title', multivalu option: Str('manager', attribute=True, autofill=False, cli_name='manager', multivalue=False, required=False) option: Str('carlicense', attribute=True, autofill=False, cli_name='carlicense', multivalue=False, required=False) option: Bool('nsaccountlock', attribute=True, autofill=False, cli_name='nsaccountlock', multivalue=False, required=False) -option: Bytes('ipasshpubkey', attribute=True, autofill=False, cli_name='sshpubkey', multivalue=True, required=False) +option: Str('ipasshpubkey', attribute=True, autofill=False, cli_name='sshpubkey', csv=True, multivalue=True, required=False) option: Str('setattr*', cli_name='setattr', exclude='webui') option: Str('addattr*', cli_name='addattr', exclude='webui') option: Str('delattr*', cli_name='delattr', exclude='webui') @@ -79,4 +79,4 @@ IPA_DATA_VERSION=20100614120000 # # ######################################################## IPA_API_VERSION_MAJOR=2 -IPA_API_VERSION_MINOR=42 +IPA_API_VERSION_MINOR=43 diff --git a/ipa-client/ipa-install/ipa-client-install b/ipa-client/ipa-install/ipa-client-install index d87fcc2a6..03a8bd3e8 100755 --- a/ipa-client/ipa-install/ipa-client-install +++ b/ipa-client/ipa-install/ipa-client-install @@ -29,7 +29,6 @@ try: from ipapython.ipa_log_manager import * import tempfile import getpass - from base64 import b64decode from ipaclient import ipadiscovery import ipaclient.ipachangeconf import ipaclient.ntpconf @@ -42,6 +41,7 @@ try: from ipapython.config import IPAOptionParser from ipalib import api, errors from ipapython.dn import DN + from ipapython.ssh import SSHPublicKey import SSSDConfig from ConfigParser import RawConfigParser from optparse import SUPPRESS_HELP, OptionGroup @@ -1112,29 +1112,23 @@ def update_ssh_keys(server, hostname, ssh_dir, create_sshfp): continue for line in f: - line = line[:-1] - if line.startswith('#'): - continue - parts = line.split() - if len(parts) < 2: - continue - try: - pubkey = b64decode(parts[1]) - except TypeError: + line = line[:-1].lstrip() + if not line or line.startswith('#'): continue try: - algo, data, fp = ipautil.decode_ssh_pubkey(pubkey) - except ValueError: - continue - if parts[0] != algo: + pubkey = SSHPublicKey(line) + except ValueError, UnicodeDecodeError: continue root_logger.info("Adding SSH public key from %s", filename) - pubkeys.append(unicode(parts[1])) + pubkeys.append(pubkey) f.close() try: - result = api.Command['host_mod'](unicode(hostname), ipasshpubkey=pubkeys, updatedns=False) + result = api.Command['host_mod'](unicode(hostname), + ipasshpubkey=[pk.openssh() for pk in pubkeys], + updatedns=False + ) except errors.EmptyModlist: pass except StandardError, e: @@ -1148,8 +1142,7 @@ def update_ssh_keys(server, hostname, ssh_dir, create_sshfp): update_txt = 'zone %s.\nupdate delete %s. IN SSHFP\nsend\n' % (zone, hostname) for pubkey in pubkeys: - pubkey = b64decode(pubkey) - sshfp = ipautil.make_sshfp(pubkey) + sshfp = pubkey.fingerprint_dns_sha1() if sshfp is not None: update_txt += 'update add %s. %s IN SSHFP %s\n' % (hostname, ttl, sshfp) update_txt += 'send\n' diff --git a/ipalib/parameters.py b/ipalib/parameters.py index 13f04b454..53756a80a 100644 --- a/ipalib/parameters.py +++ b/ipalib/parameters.py @@ -772,8 +772,6 @@ class Param(ReadOnly): This method is called once for each value in a multivalue. """ - if type(value) is not unicode: - return value if self.normalizer is None: return value try: diff --git a/ipalib/plugins/host.py b/ipalib/plugins/host.py index 91b3ce677..e1c07b53b 100644 --- a/ipalib/plugins/host.py +++ b/ipalib/plugins/host.py @@ -39,8 +39,10 @@ from ipalib.plugins.dns import get_reverse_zone from ipalib import _, ngettext from ipalib import x509 from ipalib.request import context -from ipalib.util import validate_sshpubkey, output_sshpubkey -from ipapython.ipautil import ipa_generate_password, CheckedIPAddress, make_sshfp +from ipalib.util import (normalize_sshpubkey, validate_sshpubkey_no_options, + convert_sshpubkey_post) +from ipapython.ipautil import ipa_generate_password, CheckedIPAddress +from ipapython.ssh import SSHPublicKey from ipapython.dn import DN __doc__ = _(""" @@ -131,7 +133,10 @@ def update_sshfp_record(zone, record, entry_attrs): pubkeys = entry_attrs['ipasshpubkey'] or () sshfps=[] for pubkey in pubkeys: - sshfp = unicode(make_sshfp(pubkey)) + try: + sshfp = SSHPublicKey(pubkey).fingerprint_dns_sha1() + except ValueError, UnicodeDecodeError: + continue if sshfp is not None: sshfps.append(sshfp) @@ -180,6 +185,9 @@ host_output_params = ( Str('managedby', label=_('Failed managedby'), ), + Str('sshpubkeyfp*', + label=_('SSH public key fingerprint'), + ), ) def validate_ipaddr(ugettext, ipaddr): @@ -216,7 +224,6 @@ class host(LDAPObject): 'fqdn', 'description', 'l', 'nshostlocation', 'krbprincipalname', 'nshardwareplatform', 'nsosversion', 'usercertificate', 'memberof', 'managedby', 'memberindirect', 'memberofindirect', 'macaddress', - 'sshpubkeyfp', ] uuid_attribute = 'ipauniqueid' attribute_members = { @@ -303,15 +310,13 @@ class host(LDAPObject): label=_('MAC address'), doc=_('Hardware MAC address(es) on this host'), ), - Bytes('ipasshpubkey*', validate_sshpubkey, + Str('ipasshpubkey*', validate_sshpubkey_no_options, cli_name='sshpubkey', - label=_('Base-64 encoded SSH public key'), + label=_('SSH public key'), + normalizer=normalize_sshpubkey, + csv=True, flags=['no_search'], ), - Str('sshpubkeyfp*', - label=_('SSH public key fingerprint'), - flags=['virtual_attribute', 'no_create', 'no_update', 'no_search'], - ), ) def get_dn(self, *keys, **options): @@ -472,7 +477,7 @@ class host_add(LDAPCreate): # fetched anywhere. entry_attrs['has_keytab'] = False - output_sshpubkey(ldap, dn, entry_attrs) + convert_sshpubkey_post(ldap, dn, entry_attrs) return dn @@ -717,7 +722,7 @@ class host_mod(LDAPUpdate): self.obj.suppress_netgroup_memberof(entry_attrs) - output_sshpubkey(ldap, dn, entry_attrs) + convert_sshpubkey_post(ldap, dn, entry_attrs) return dn @@ -802,7 +807,7 @@ class host_find(LDAPSearch): if options.get('all', False): entry_attrs['managing'] = self.obj.get_managed_hosts(entry[0]) - output_sshpubkey(ldap, dn, entry_attrs) + convert_sshpubkey_post(ldap, dn, entry_attrs) return truncated @@ -836,7 +841,7 @@ class host_show(LDAPRetrieve): self.obj.suppress_netgroup_memberof(entry_attrs) - output_sshpubkey(ldap, dn, entry_attrs) + convert_sshpubkey_post(ldap, dn, entry_attrs) return dn diff --git a/ipalib/plugins/user.py b/ipalib/plugins/user.py index 3f0050917..84a63dfa6 100644 --- a/ipalib/plugins/user.py +++ b/ipalib/plugins/user.py @@ -30,7 +30,8 @@ from ipalib import output from ipapython.ipautil import ipa_generate_password from ipapython.ipavalidate import Email import posixpath -from ipalib.util import validate_sshpubkey, output_sshpubkey +from ipalib.util import (normalize_sshpubkey, validate_sshpubkey, + convert_sshpubkey_post) if api.env.in_server and api.env.context in ['lite', 'server']: from ipaserver.plugins.ldap2 import ldap2 import os @@ -86,6 +87,9 @@ user_output_params = ( Flag('has_keytab', label=_('Kerberos keys available'), ), + Str('sshpubkeyfp*', + label=_('SSH public key fingerprint'), + ), ) status_output_params = ( @@ -200,7 +204,7 @@ class user(LDAPObject): 'uid', 'givenname', 'sn', 'homedirectory', 'loginshell', 'uidnumber', 'gidnumber', 'mail', 'ou', 'telephonenumber', 'title', 'memberof', 'nsaccountlock', - 'memberofindirect', 'sshpubkeyfp', + 'memberofindirect', ] search_display_attributes = [ 'uid', 'givenname', 'sn', 'homedirectory', 'loginshell', @@ -357,15 +361,13 @@ class user(LDAPObject): label=_('Account disabled'), flags=['no_option'], ), - Bytes('ipasshpubkey*', validate_sshpubkey, + Str('ipasshpubkey*', validate_sshpubkey, cli_name='sshpubkey', - label=_('Base-64 encoded SSH public key'), + label=_('SSH public key'), + normalizer=normalize_sshpubkey, + csv=True, flags=['no_search'], ), - Str('sshpubkeyfp*', - label=_('SSH public key fingerprint'), - flags=['virtual_attribute', 'no_create', 'no_update', 'no_search'], - ), ) def _normalize_and_validate_email(self, email, config=None): @@ -567,7 +569,7 @@ class user_add(LDAPCreate): self.obj.get_password_attributes(ldap, dn, entry_attrs) - output_sshpubkey(ldap, dn, entry_attrs) + convert_sshpubkey_post(ldap, dn, entry_attrs) return dn @@ -636,7 +638,7 @@ class user_mod(LDAPUpdate): convert_nsaccountlock(entry_attrs) self.obj._convert_manager(entry_attrs, **options) self.obj.get_password_attributes(ldap, dn, entry_attrs) - output_sshpubkey(ldap, dn, entry_attrs) + convert_sshpubkey_post(ldap, dn, entry_attrs) return dn api.register(user_mod) @@ -678,7 +680,7 @@ class user_find(LDAPSearch): self.obj._convert_manager(attrs, **options) self.obj.get_password_attributes(ldap, dn, attrs) convert_nsaccountlock(attrs) - output_sshpubkey(ldap, dn, attrs) + convert_sshpubkey_post(ldap, dn, attrs) return truncated msg_summary = ngettext( @@ -698,7 +700,7 @@ class user_show(LDAPRetrieve): convert_nsaccountlock(entry_attrs) self.obj._convert_manager(entry_attrs, **options) self.obj.get_password_attributes(ldap, dn, entry_attrs) - output_sshpubkey(ldap, dn, entry_attrs) + convert_sshpubkey_post(ldap, dn, entry_attrs) return dn api.register(user_show) diff --git a/ipalib/util.py b/ipalib/util.py index 155d93294..ca71e78db 100644 --- a/ipalib/util.py +++ b/ipalib/util.py @@ -34,7 +34,7 @@ from dns.exception import DNSException from ipalib import errors from ipalib.text import _ -from ipapython.ipautil import decode_ssh_pubkey +from ipapython.ssh import SSHPublicKey from ipapython.dn import DN, RDN @@ -266,37 +266,55 @@ def validate_hostname(hostname, check_fqdn=True, allow_underscore=False): else: validate_domain_name(hostname,allow_underscore) -def validate_sshpubkey(ugettext, pubkey): +def normalize_sshpubkey(value): + return SSHPublicKey(value).openssh() + +def validate_sshpubkey(ugettext, value): try: - algo, data, fp = decode_ssh_pubkey(pubkey) - except ValueError: + SSHPublicKey(value) + except ValueError, UnicodeDecodeError: return _('invalid SSH public key') -def output_sshpubkey(ldap, dn, entry_attrs): +def validate_sshpubkey_no_options(ugettext, value): + try: + pubkey = SSHPublicKey(value) + except ValueError, UnicodeDecodeError: + return _('invalid SSH public key') + + if pubkey.has_options(): + return _('options are not allowed') + +def convert_sshpubkey_post(ldap, dn, entry_attrs): if 'ipasshpubkey' in entry_attrs: - pubkeys = entry_attrs.get('ipasshpubkey') + pubkeys = entry_attrs.pop('ipasshpubkey') else: - entry = ldap.get_entry(dn, ['ipasshpubkey']) - pubkeys = entry[1].get('ipasshpubkey') - if pubkeys is None: + old_entry_attrs = ldap.get_entry(dn, ['ipasshpubkey']) + pubkeys = old_entry_attrs[1].get('ipasshpubkey') + if not pubkeys: return + newpubkeys = [] fingerprints = [] for pubkey in pubkeys: try: - algo, data, fp = decode_ssh_pubkey(pubkey) - fp = u':'.join([fp[j:j+2] for j in range(0, len(fp), 2)]) - fingerprints.append(u'%s (%s)' % (fp, algo)) - except ValueError: - pass + pubkey = SSHPublicKey(pubkey) + except ValueError, UnicodeDecodeError: + continue + + fp = pubkey.fingerprint_hex_md5() + comment = pubkey.comment() + if comment: + fp = u'%s %s' % (fp, comment) + fp = u'%s (%s)' % (fp, pubkey.keytype()) + + newpubkeys.append(pubkey.openssh()) + fingerprints.append(fp) + + if newpubkeys: + entry_attrs['ipasshpubkey'] = newpubkeys if fingerprints: entry_attrs['sshpubkeyfp'] = fingerprints -def normalize_sshpubkeyfp(value): - value = value.split()[0] - value = unicode(c for c in value if c in '0123456789ABCDEFabcdef') - return value - class cachedproperty(object): """ A property-like attribute that caches the return value of a method call. diff --git a/ipapython/ipautil.py b/ipapython/ipautil.py index a212aa6ef..a3fd83e45 100644 --- a/ipapython/ipautil.py +++ b/ipapython/ipautil.py @@ -64,7 +64,6 @@ except ImportError: self.cmd = cmd def __str__(self): return "Command '%s' returned non-zero exit status %d" % (self.cmd, self.returncode) -from ipapython.compat import sha1, md5 def get_domain_name(): try: @@ -1022,25 +1021,6 @@ def backup_config_and_replace_variables( return old_values -def decode_ssh_pubkey(data, fptype=md5): - try: - (algolen,) = struct.unpack('>I', data[:4]) - if algolen > 0 and algolen <= len(data) - 4: - return (data[4:algolen+4], data[algolen+4:], fptype(data).hexdigest().upper()) - except struct.error: - pass - raise ValueError('not a SSH public key') - -def make_sshfp(key): - algo, data, fp = decode_ssh_pubkey(key, fptype=sha1) - if algo == 'ssh-rsa': - algo = 1 - elif algo == 'ssh-dss': - algo = 2 - else: - return - return '%d 1 %s' % (algo, fp) - def utf8_encode_value(value): if isinstance(value, unicode): diff --git a/ipapython/ssh.py b/ipapython/ssh.py new file mode 100644 index 000000000..667d21e1e --- /dev/null +++ b/ipapython/ssh.py @@ -0,0 +1,199 @@ +# Authors: +# Jan Cholasta <jcholast@redhat.com> +# +# Copyright (C) 2012 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +""" +SSH utilities. +""" + +import base64 +import re +import struct + +from ipapython.compat import md5, sha1 + +__all__ = ['SSHPublicKey'] + +OPENSSH_BASE_REGEX = re.compile(r'^[\t ]*(?P<keytype>[^\x00\n\r]+?) [\t ]*(?P<key>[^\x00\n\r]+?)(?:[\t ]+(?P<comment>[^\x00\n\r]*?)[\t ]*)?$') +OPENSSH_OPTIONS_REGEX = re.compile(r'(?P<name>[-0-9A-Za-z]+)(?:="(?P<value>(?:\\"|[^\x00\n\r"])*)")?') + +class SSHPublicKey(object): + """ + SSH public key object. + """ + + __slots__ = ('_key', '_keytype', '_comment', '_options') + + def __init__(self, key, comment=None, options=None, encoding='utf-8'): + if isinstance(key, SSHPublicKey): + self._key = key._key + self._keytype = key._keytype + self._comment = key._comment + self._options = key._options + return + + if not isinstance(key, (str, unicode)): + raise TypeError("argument must be str or unicode, got %s" % type(key).__name__) + + # All valid public key blobs start with 3 null bytes (see RFC 4253 + # section 6.6, RFC 4251 section 5 and RFC 4250 section 4.6) + if isinstance(key, str) and key[:3] != '\0\0\0': + key = key.decode(encoding) + + valid = self._parse_raw(key) or self._parse_base64(key) or self._parse_openssh(key) + if not valid: + raise ValueError("not a valid SSH public key") + + if comment is not None: + self._comment = comment + if options is not None: + self._options = options + + def _parse_raw(self, key): + if not isinstance(key, str): + return False + + try: + (ktlen,) = struct.unpack('>I', key[:4]) + except struct.error: + return False + + if ktlen < 1 or ktlen > len(key) - 4: + return False + + try: + keytype = key[4:ktlen+4].decode('ascii') + except UnicodeDecodeError: + return False + + self._key = key + self._keytype = keytype + self._options = {} + self._comment = None + + return True + + def _parse_base64(self, key): + if not isinstance(key, unicode): + return False + + try: + key = base64.b64decode(key) + except TypeError: + return False + + return self._parse_raw(key) + + def _parse_openssh_without_options(self, key): + match = OPENSSH_BASE_REGEX.match(key) + if not match: + return False + + if not self._parse_base64(match.group('key')): + return False + + if self._keytype != match.group('keytype'): + return False + + self._comment = match.group('comment') + + return True + + def _parse_openssh_with_options(self, key): + key = key.lstrip('\t ') + + options = {} + while True: + match = OPENSSH_OPTIONS_REGEX.match(key) + if not match: + return False + + name = match.group('name').lower() + value = match.group('value') + if value: + value = value.replace('\\"', '"') + + options[name] = value + + key = key[len(match.group(0)):] + key0, key = key[:1], key[1:] + + if key0 != ',': + break + + if not self._parse_openssh_without_options(key): + return False + + self._options = options + + return True + + def _parse_openssh(self, key): + if not isinstance(key, unicode): + return False + + if self._parse_openssh_without_options(key): + return True + else: + return self._parse_openssh_with_options(key) + + def keytype(self): + return self._keytype + + def comment(self): + return self._comment + + def has_options(self): + return bool(self._options) + + def openssh(self): + out = u'%s %s' % (self._keytype, base64.b64encode(self._key)) + + if self._options: + options = [] + for name in sorted(self._options): + value = self._options[name] + if value is None: + options.append(name) + else: + value = value.replace('"', '\\"') + options.append(u'%s="%s"' % (name, value)) + options = u','.join(options) + + out = u'%s %s' % (options, out) + + if self._comment: + out = u'%s %s' % (out, self._comment) + + return out + + def fingerprint_hex_md5(self): + fp = md5(self._key).hexdigest().upper() + fp = u':'.join([fp[j:j+2] for j in range(0, len(fp), 2)]) + return fp + + def fingerprint_dns_sha1(self): + if self._keytype == 'ssh-rsa': + keytype = 1 + elif self._keytype == 'ssh-dss': + keytype = 2 + else: + return + fp = sha1(self._key).hexdigest().upper() + return '%d 1 %s' % (keytype, fp) diff --git a/tests/test_ipapython/test_ssh.py b/tests/test_ipapython/test_ssh.py new file mode 100644 index 000000000..2640af50d --- /dev/null +++ b/tests/test_ipapython/test_ssh.py @@ -0,0 +1,76 @@ +# Authors: +# Jan Cholasta <jcholast@redhat.com> +# +# Copyright (C) 2011 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +""" +Test the `ipapython/ssh.py` module. +""" + +import base64 +import nose + +from ipapython import ssh + +class CheckPublicKey: + def __init__(self, pk): + self.description = "Test SSH public key parsing (%s)" % repr(pk) + + def __call__(self, pk, out): + try: + parsed = ssh.SSHPublicKey(pk) + assert parsed.openssh() == out + except Exception, e: + assert type(e) is out + +def test_public_key_parsing(): + b64 = 'AAAAB3NzaC1yc2EAAAADAQABAAABAQDGAX3xAeLeaJggwTqMjxNwa6XHBUAikXPGMzEpVrlLDCZtv00djsFTBi38PkgxBJVkgRWMrcBsr/35lq7P6w8KGIwA8GI48Z0qBS2NBMJ2u9WQ2hjLN6GdMlo77O0uJY3251p12pCVIS/bHRSq8kHO2No8g7KA9fGGcagPfQH+ee3t7HUkpbQkFTmbPPN++r3V8oVUk5LxbryB3UIIVzNmcSIn3JrXynlvui4MixvrtX6zx+O/bBo68o8/eZD26QrahVbA09fivrn/4h3TM019Eu/c2jOdckfU3cHUV/3Tno5d6JicibyaoDDK7S/yjdn5jhaz8MSEayQvFkZkiF0L' + raw = base64.b64decode(b64) + openssh = 'ssh-rsa %s' % b64 + + pks = [ + ('\xff', UnicodeDecodeError), + + (raw, openssh), + ('\0\0\0\x04none', u'none AAAABG5vbmU='), + ('\0\0\0', ValueError), + ('\0\0\0\0', ValueError), + ('\0\0\0\x01', ValueError), + ('\0\0\0\x01\xff', ValueError), + + (b64, openssh), + (unicode(b64), openssh), + (u'\n%s\n\n' % b64, openssh), + (u'AAAABG5vbmU=', u'none AAAABG5vbmU='), + (u'AAAAB', ValueError), + + (openssh, openssh), + (unicode(openssh), openssh), + (u'none AAAABG5vbmU=', u'none AAAABG5vbmU='), + (u'\t \t ssh-rsa \t \t%s\t \tthis is a comment\t \t ' % b64, + u'%s this is a comment' % openssh), + (u'opt3,opt2="\tx ",opt1,opt2="\\"x " %s comment ' % openssh, + u'opt1,opt2="\\"x ",opt3 %s comment' % openssh), + (u'ssh-rsa\n%s' % b64, ValueError), + (u'ssh-rsa\t%s' % b64, ValueError), + (u'vanitas %s' % b64, ValueError), + (u'@opt %s' % openssh, ValueError), + (u'opt=val %s' % openssh, ValueError), + (u'opt, %s' % openssh, ValueError), + ] + + for pk in pks: + yield (CheckPublicKey(pk[0]),) + pk diff --git a/tests/test_xmlrpc/test_host_plugin.py b/tests/test_xmlrpc/test_host_plugin.py index 27d3adb04..b3eb3151e 100644 --- a/tests/test_xmlrpc/test_host_plugin.py +++ b/tests/test_xmlrpc/test_host_plugin.py @@ -62,6 +62,9 @@ servercert = ''.join(servercert) servercert = x509.strip_header(servercert) fd.close() +sshpubkey = u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDGAX3xAeLeaJggwTqMjxNwa6XHBUAikXPGMzEpVrlLDCZtv00djsFTBi38PkgxBJVkgRWMrcBsr/35lq7P6w8KGIwA8GI48Z0qBS2NBMJ2u9WQ2hjLN6GdMlo77O0uJY3251p12pCVIS/bHRSq8kHO2No8g7KA9fGGcagPfQH+ee3t7HUkpbQkFTmbPPN++r3V8oVUk5LxbryB3UIIVzNmcSIn3JrXynlvui4MixvrtX6zx+O/bBo68o8/eZD26QrahVbA09fivrn/4h3TM019Eu/c2jOdckfU3cHUV/3Tno5d6JicibyaoDDK7S/yjdn5jhaz8MSEayQvFkZkiF0L public key test' +sshpubkeyfp = u'13:67:6B:BF:4E:A2:05:8E:AE:25:8B:A1:31:DE:6F:1B public key test (ssh-rsa)' + class test_host(Declarative): cleanup_commands = [ @@ -542,6 +545,45 @@ class test_host(Declarative): dict( + desc='Add SSH public key to %r' % fqdn1, + command=('host_mod', [fqdn1], dict(ipasshpubkey=[sshpubkey])), + expected=dict( + value=fqdn1, + summary=u'Modified host "%s"' % fqdn1, + result=dict( + description=[u'Updated host 1'], + fqdn=[fqdn1], + l=[u'Undisclosed location 1'], + krbprincipalname=[u'host/%s@%s' % (fqdn1, api.env.realm)], + managedby_host=[u'%s' % fqdn1], + usercertificate=[base64.b64decode(servercert)], + valid_not_before=fuzzy_date, + valid_not_after=fuzzy_date, + subject=DN(('CN',api.env.host),x509.subject_base()), + serial_number=fuzzy_digits, + serial_number_hex=fuzzy_hex, + md5_fingerprint=fuzzy_hash, + sha1_fingerprint=fuzzy_hash, + issuer=fuzzy_issuer, + macaddress=[u'00:50:56:30:F6:5F', u'00:50:56:2C:8D:82'], + ipasshpubkey=[sshpubkey], + sshpubkeyfp=[sshpubkeyfp], + has_keytab=False, + has_password=False, + ), + ), + ), + + + dict( + desc='Add an illegal SSH public key to %r' % fqdn1, + command=('host_mod', [fqdn1], dict(ipasshpubkey=[u'no-pty %s' % sshpubkey])), + expected=errors.ValidationError(name='sshpubkey', + error=u'options are not allowed'), + ), + + + dict( desc='Delete %r' % fqdn1, command=('host_del', [fqdn1], {}), expected=dict( diff --git a/tests/test_xmlrpc/test_user_plugin.py b/tests/test_xmlrpc/test_user_plugin.py index d374e02f0..15a195590 100644 --- a/tests/test_xmlrpc/test_user_plugin.py +++ b/tests/test_xmlrpc/test_user_plugin.py @@ -40,6 +40,9 @@ admins_group=u'admins' invaliduser1=u'+tuser1' invaliduser2=u'tuser1234567890123456789012345678901234567890' +sshpubkey = u'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDGAX3xAeLeaJggwTqMjxNwa6XHBUAikXPGMzEpVrlLDCZtv00djsFTBi38PkgxBJVkgRWMrcBsr/35lq7P6w8KGIwA8GI48Z0qBS2NBMJ2u9WQ2hjLN6GdMlo77O0uJY3251p12pCVIS/bHRSq8kHO2No8g7KA9fGGcagPfQH+ee3t7HUkpbQkFTmbPPN++r3V8oVUk5LxbryB3UIIVzNmcSIn3JrXynlvui4MixvrtX6zx+O/bBo68o8/eZD26QrahVbA09fivrn/4h3TM019Eu/c2jOdckfU3cHUV/3Tno5d6JicibyaoDDK7S/yjdn5jhaz8MSEayQvFkZkiF0L public key test' +sshpubkeyfp = u'13:67:6B:BF:4E:A2:05:8E:AE:25:8B:A1:31:DE:6F:1B public key test (ssh-rsa)' + def get_user_dn(uid): return DN(('uid', uid), api.env.container_user, api.env.basedn) @@ -563,6 +566,64 @@ class test_user(Declarative): dict( + desc='Create "%s" with SSH public key' % user1, + command=( + 'user_add', [user1], dict(givenname=u'Test', sn=u'User1', ipasshpubkey=[sshpubkey]) + ), + expected=dict( + value=user1, + summary=u'Added user "%s"' % user1, + result=dict( + gecos=[u'Test User1'], + givenname=[u'Test'], + homedirectory=[u'/home/tuser1'], + krbprincipalname=[u'tuser1@' + api.env.realm], + loginshell=[u'/bin/sh'], + objectclass=objectclasses.user, + sn=[u'User1'], + uid=[user1], + uidnumber=[fuzzy_digits], + gidnumber=[fuzzy_digits], + displayname=[u'Test User1'], + cn=[u'Test User1'], + initials=[u'TU'], + mail=[u'%s@%s' % (user1, api.env.domain)], + ipasshpubkey=[sshpubkey], + sshpubkeyfp=[sshpubkeyfp], + ipauniqueid=[fuzzy_uuid], + krbpwdpolicyreference=[DN(('cn','global_policy'),('cn',api.env.realm), + ('cn','kerberos'),api.env.basedn)], + mepmanagedentry=[get_group_dn(user1)], + memberof_group=[u'ipausers'], + has_keytab=False, + has_password=False, + dn=get_user_dn(user1), + ), + ), + extra_check = upg_check, + ), + + + dict( + desc='Add an illegal SSH public key to "%r"' % user1, + command=('user_mod', [user1], dict(ipasshpubkey=[u"anal nathrach orth' bhais's bethad do che'l de'nmha"])), + expected=errors.ValidationError(name='sshpubkey', + error=u'invalid SSH public key'), + ), + + + dict( + desc='Delete "%s"' % user1, + command=('user_del', [user1], {}), + expected=dict( + result=dict(failed=u''), + summary=u'Deleted user "%s"' % user1, + value=user1, + ), + ), + + + dict( desc='Create "%s"' % user1, command=( 'user_add', [user1], dict(givenname=u'Test', sn=u'User1') |