diff options
author | Jan Cholasta <jcholast@redhat.com> | 2016-04-28 10:30:05 +0200 |
---|---|---|
committer | Jan Cholasta <jcholast@redhat.com> | 2016-06-03 09:00:34 +0200 |
commit | 6e44557b601f769d23ee74555a72e8b5cc62c0c9 (patch) | |
tree | eedd3e054b0709341b9f58c190ea54f999f7d13a /ipaserver/plugins/cert.py | |
parent | ec841e5d7ab29d08de294b3fa863a631cd50e30a (diff) | |
download | freeipa-6e44557b601f769d23ee74555a72e8b5cc62c0c9.tar.gz freeipa-6e44557b601f769d23ee74555a72e8b5cc62c0c9.tar.xz freeipa-6e44557b601f769d23ee74555a72e8b5cc62c0c9.zip |
ipalib: move server-side plugins to ipaserver
Move the remaining plugin code from ipalib.plugins to ipaserver.plugins.
Remove the now unused ipalib.plugins package.
https://fedorahosted.org/freeipa/ticket/4739
Reviewed-By: David Kupka <dkupka@redhat.com>
Diffstat (limited to 'ipaserver/plugins/cert.py')
-rw-r--r-- | ipaserver/plugins/cert.py | 835 |
1 files changed, 835 insertions, 0 deletions
diff --git a/ipaserver/plugins/cert.py b/ipaserver/plugins/cert.py new file mode 100644 index 000000000..cbb5382fb --- /dev/null +++ b/ipaserver/plugins/cert.py @@ -0,0 +1,835 @@ +# Authors: +# Andrew Wnuk <awnuk@redhat.com> +# Jason Gerard DeRose <jderose@redhat.com> +# John Dennis <jdennis@redhat.com> +# +# Copyright (C) 2009 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import os +import time +import binascii + +from ipalib import Command, Str, Int, Flag +from ipalib import api +from ipalib import errors +from ipalib import pkcs10 +from ipalib import x509 +from ipalib import ngettext +from ipalib.plugable import Registry +from .virtual import VirtualCommand +from .baseldap import pkey_to_value +from .service import split_any_principal +from .certprofile import validate_profile_id +from .caacl import acl_evaluate +from ipalib.text import _ +from ipalib.request import context +from ipalib import output +from .service import validate_principal +from ipapython.dn import DN + +import six +import nss.nss as nss +from nss.error import NSPRError +from pyasn1.error import PyAsn1Error + +if six.PY3: + unicode = str + +__doc__ = _(""" +IPA certificate operations + +Implements a set of commands for managing server SSL certificates. + +Certificate requests exist in the form of a Certificate Signing Request (CSR) +in PEM format. + +The dogtag CA uses just the CN value of the CSR and forces the rest of the +subject to values configured in the server. + +A certificate is stored with a service principal and a service principal +needs a host. + +In order to request a certificate: + +* The host must exist +* The service must exist (or you use the --add option to automatically add it) + +SEARCHING: + +Certificates may be searched on by certificate subject, serial number, +revocation reason, validity dates and the issued date. + +When searching on dates the _from date does a >= search and the _to date +does a <= search. When combined these are done as an AND. + +Dates are treated as GMT to match the dates in the certificates. + +The date format is YYYY-mm-dd. + +EXAMPLES: + + Request a new certificate and add the principal: + ipa cert-request --add --principal=HTTP/lion.example.com example.csr + + Retrieve an existing certificate: + ipa cert-show 1032 + + Revoke a certificate (see RFC 5280 for reason details): + ipa cert-revoke --revocation-reason=6 1032 + + Remove a certificate from revocation hold status: + ipa cert-remove-hold 1032 + + Check the status of a signing request: + ipa cert-status 10 + + Search for certificates by hostname: + ipa cert-find --subject=ipaserver.example.com + + Search for revoked certificates by reason: + ipa cert-find --revocation-reason=5 + + Search for certificates based on issuance date + ipa cert-find --issuedon-from=2013-02-01 --issuedon-to=2013-02-07 + +IPA currently immediately issues (or declines) all certificate requests so +the status of a request is not normally useful. This is for future use +or the case where a CA does not immediately issue a certificate. + +The following revocation reasons are supported: + + * 0 - unspecified + * 1 - keyCompromise + * 2 - cACompromise + * 3 - affiliationChanged + * 4 - superseded + * 5 - cessationOfOperation + * 6 - certificateHold + * 8 - removeFromCRL + * 9 - privilegeWithdrawn + * 10 - aACompromise + +Note that reason code 7 is not used. See RFC 5280 for more details: + +http://www.ietf.org/rfc/rfc5280.txt + +""") + +USER, HOST, SERVICE = range(3) + +register = Registry() + +def validate_pkidate(ugettext, value): + """ + A date in the format of %Y-%m-%d + """ + try: + ts = time.strptime(value, '%Y-%m-%d') + except ValueError as e: + return str(e) + + return None + +def validate_csr(ugettext, csr): + """ + Ensure the CSR is base64-encoded and can be decoded by our PKCS#10 + parser. + """ + if api.env.context == 'cli': + # If we are passed in a pointer to a valid file on the client side + # escape and let the load_files() handle things + if csr and os.path.exists(csr): + return + try: + request = pkcs10.load_certificate_request(csr) + except (TypeError, binascii.Error) as e: + raise errors.Base64DecodeError(reason=str(e)) + except Exception as e: + raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request: %s') % e) + +def normalize_csr(csr): + """ + Strip any leading and trailing cruft around the BEGIN/END block + """ + end_len = 37 + s = csr.find('-----BEGIN NEW CERTIFICATE REQUEST-----') + if s == -1: + s = csr.find('-----BEGIN CERTIFICATE REQUEST-----') + e = csr.find('-----END NEW CERTIFICATE REQUEST-----') + if e == -1: + e = csr.find('-----END CERTIFICATE REQUEST-----') + if e != -1: + end_len = 33 + + if s > -1 and e > -1: + # We're normalizing here, not validating + csr = csr[s:e+end_len] + + return csr + +def _convert_serial_number(num): + """ + Convert a SN given in decimal or hexadecimal. + Returns the number or None if conversion fails. + """ + # plain decimal or hexa with radix prefix + try: + num = int(num, 0) + except ValueError: + try: + # hexa without prefix + num = int(num, 16) + except ValueError: + num = None + + return num + +def validate_serial_number(ugettext, num): + if _convert_serial_number(num) == None: + return u"Decimal or hexadecimal number is required for serial number" + return None + +def normalize_serial_number(num): + # It's been already validated + return unicode(_convert_serial_number(num)) + +def get_host_from_principal(principal): + """ + Given a principal with or without a realm return the + host portion. + """ + validate_principal(None, principal) + realm = principal.find('@') + slash = principal.find('/') + if realm == -1: + realm = len(principal) + hostname = principal[slash+1:realm] + + return hostname + +def ca_enabled_check(): + if not api.Command.ca_is_enabled()['result']: + raise errors.NotFound(reason=_('CA is not configured')) + +def caacl_check(principal_type, principal_string, ca, profile_id): + principal_type_map = {USER: 'user', HOST: 'host', SERVICE: 'service'} + if not acl_evaluate( + principal_type_map[principal_type], + principal_string, ca, profile_id): + raise errors.ACIError(info=_( + "Principal '%(principal)s' " + "is not permitted to use CA '%(ca)s' " + "with profile '%(profile_id)s' for certificate issuance." + ) % dict( + principal=principal_string, + ca=ca or '.', + profile_id=profile_id + ) + ) + +@register() +class cert_request(VirtualCommand): + __doc__ = _('Submit a certificate signing request.') + + takes_args = ( + Str( + 'csr', validate_csr, + label=_('CSR'), + cli_name='csr_file', + normalizer=normalize_csr, + noextrawhitespace=False, + ), + ) + operation="request certificate" + + takes_options = ( + Str('principal', + label=_('Principal'), + doc=_('Principal for this certificate (e.g. HTTP/test.example.com)'), + ), + Str('request_type', + default=u'pkcs10', + autofill=True, + ), + Flag('add', + doc=_("automatically add the principal if it doesn't exist"), + default=False, + autofill=True + ), + Str('profile_id?', validate_profile_id, + label=_("Profile ID"), + doc=_("Certificate Profile to use"), + ) + ) + + has_output_params = ( + Str('certificate', + label=_('Certificate'), + ), + Str('subject', + label=_('Subject'), + ), + Str('issuer', + label=_('Issuer'), + ), + Str('valid_not_before', + label=_('Not Before'), + ), + Str('valid_not_after', + label=_('Not After'), + ), + Str('md5_fingerprint', + label=_('Fingerprint (MD5)'), + ), + Str('sha1_fingerprint', + label=_('Fingerprint (SHA1)'), + ), + Str('serial_number', + label=_('Serial number'), + ), + Str('serial_number_hex', + label=_('Serial number (hex)'), + ), + ) + + has_output = ( + output.Output('result', + type=dict, + doc=_('Dictionary mapping variable name to value'), + ), + ) + + def execute(self, csr, **kw): + ca_enabled_check() + + ldap = self.api.Backend.ldap2 + add = kw.get('add') + request_type = kw.get('request_type') + profile_id = kw.get('profile_id', self.Backend.ra.DEFAULT_PROFILE) + ca = '.' # top-level CA hardcoded until subca plugin implemented + + """ + Access control is partially handled by the ACI titled + 'Hosts can modify service userCertificate'. This is for the case + where a machine binds using a host/ prinicpal. It can only do the + request if the target hostname is in the managedBy attribute which + is managed using the add/del member commands. + + Binding with a user principal one needs to be in the request_certs + taskgroup (directly or indirectly via role membership). + """ + + principal_string = kw.get('principal') + principal = split_any_principal(principal_string) + servicename, principal_name, realm = principal + if servicename is None: + principal_type = USER + elif servicename == 'host': + principal_type = HOST + else: + principal_type = SERVICE + + bind_principal = split_any_principal(getattr(context, 'principal')) + bind_service, bind_name, bind_realm = bind_principal + + if bind_service is None: + bind_principal_type = USER + elif bind_service == 'host': + bind_principal_type = HOST + else: + bind_principal_type = SERVICE + + if bind_principal != principal and bind_principal_type != HOST: + # Can the bound principal request certs for another principal? + self.check_access() + + try: + self.check_access("request certificate ignore caacl") + bypass_caacl = True + except errors.ACIError: + bypass_caacl = False + + if not bypass_caacl: + caacl_check(principal_type, principal_string, ca, profile_id) + + try: + subject = pkcs10.get_subject(csr) + extensions = pkcs10.get_extensions(csr) + subjectaltname = pkcs10.get_subjectaltname(csr) or () + except (NSPRError, PyAsn1Error, ValueError) as e: + raise errors.CertificateOperationError( + error=_("Failure decoding Certificate Signing Request: %s") % e) + + # self-service and host principals may bypass SAN permission check + if bind_principal != principal and bind_principal_type != HOST: + if '2.5.29.17' in extensions: + self.check_access('request certificate with subjectaltname') + + dn = None + principal_obj = None + # See if the service exists and punt if it doesn't and we aren't + # going to add it + try: + if principal_type == SERVICE: + principal_obj = api.Command['service_show'](principal_string, all=True) + elif principal_type == HOST: + principal_obj = api.Command['host_show'](principal_name, all=True) + elif principal_type == USER: + principal_obj = api.Command['user_show'](principal_name, all=True) + except errors.NotFound as e: + if principal_type == SERVICE and add: + principal_obj = api.Command['service_add'](principal_string, force=True) + else: + raise errors.NotFound( + reason=_("The principal for this request doesn't exist.")) + principal_obj = principal_obj['result'] + dn = principal_obj['dn'] + + # Ensure that the DN in the CSR matches the principal + cn = subject.common_name #pylint: disable=E1101 + if not cn: + raise errors.ValidationError(name='csr', + error=_("No Common Name was found in subject of request.")) + + if principal_type in (SERVICE, HOST): + if cn.lower() != principal_name.lower(): + raise errors.ACIError( + info=_("hostname in subject of request '%(cn)s' " + "does not match principal hostname '%(hostname)s'") + % dict(cn=cn, hostname=principal_name)) + elif principal_type == USER: + # check user name + if cn != principal_name: + raise errors.ValidationError( + name='csr', + error=_("DN commonName does not match user's login") + ) + + # check email address + mail = subject.email_address #pylint: disable=E1101 + if mail is not None and mail not in principal_obj.get('mail', []): + raise errors.ValidationError( + name='csr', + error=_( + "DN emailAddress does not match " + "any of user's email addresses") + ) + + # We got this far so the principal entry exists, can we write it? + if not ldap.can_write(dn, "usercertificate"): + raise errors.ACIError(info=_("Insufficient 'write' privilege " + "to the 'userCertificate' attribute of entry '%s'.") % dn) + + # Validate the subject alt name, if any + for name_type, name in subjectaltname: + if name_type == pkcs10.SAN_DNSNAME: + name = unicode(name) + alt_principal_obj = None + alt_principal_string = None + try: + if principal_type == HOST: + alt_principal_string = 'host/%s@%s' % (name, realm) + alt_principal_obj = api.Command['host_show'](name, all=True) + elif principal_type == SERVICE: + alt_principal_string = '%s/%s@%s' % (servicename, name, realm) + alt_principal_obj = api.Command['service_show']( + alt_principal_string, all=True) + elif principal_type == USER: + raise errors.ValidationError( + name='csr', + error=_("subject alt name type %s is forbidden " + "for user principals") % name_type + ) + except errors.NotFound: + # We don't want to issue any certificates referencing + # machines we don't know about. Nothing is stored in this + # host record related to this certificate. + raise errors.NotFound(reason=_('The service principal for ' + 'subject alt name %s in certificate request does not ' + 'exist') % name) + if alt_principal_obj is not None: + altdn = alt_principal_obj['result']['dn'] + if not ldap.can_write(altdn, "usercertificate"): + raise errors.ACIError(info=_( + "Insufficient privilege to create a certificate " + "with subject alt name '%s'.") % name) + if alt_principal_string is not None and not bypass_caacl: + caacl_check( + principal_type, alt_principal_string, ca, profile_id) + elif name_type in (pkcs10.SAN_OTHERNAME_KRB5PRINCIPALNAME, + pkcs10.SAN_OTHERNAME_UPN): + if split_any_principal(name) != principal: + raise errors.ACIError( + info=_("Principal '%s' in subject alt name does not " + "match requested principal") % name) + elif name_type == pkcs10.SAN_RFC822NAME: + if principal_type == USER: + if name not in principal_obj.get('mail', []): + raise errors.ValidationError( + name='csr', + error=_( + "RFC822Name does not match " + "any of user's email addresses") + ) + else: + raise errors.ValidationError( + name='csr', + error=_("subject alt name type %s is forbidden " + "for non-user principals") % name_type + ) + else: + raise errors.ACIError( + info=_("Subject alt name type %s is forbidden") % + name_type) + + # Request the certificate + result = self.Backend.ra.request_certificate( + csr, profile_id, request_type=request_type) + cert = x509.load_certificate(result['certificate']) + result['issuer'] = unicode(cert.issuer) + result['valid_not_before'] = unicode(cert.valid_not_before_str) + result['valid_not_after'] = unicode(cert.valid_not_after_str) + result['md5_fingerprint'] = unicode(nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0]) + result['sha1_fingerprint'] = unicode(nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0]) + + # Success? Then add it to the principal's entry + # (unless the profile tells us not to) + profile = api.Command['certprofile_show'](profile_id) + store = profile['result']['ipacertprofilestoreissued'][0] == 'TRUE' + if store and 'certificate' in result: + cert = str(result.get('certificate')) + kwargs = dict(addattr=u'usercertificate={}'.format(cert)) + if principal_type == SERVICE: + api.Command['service_mod'](principal_string, **kwargs) + elif principal_type == HOST: + api.Command['host_mod'](principal_name, **kwargs) + elif principal_type == USER: + api.Command['user_mod'](principal_name, **kwargs) + + return dict( + result=result + ) + + + +@register() +class cert_status(VirtualCommand): + __doc__ = _('Check the status of a certificate signing request.') + + takes_args = ( + Str('request_id', + label=_('Request id'), + flags=['no_create', 'no_update', 'no_search'], + ), + ) + has_output_params = ( + Str('cert_request_status', + label=_('Request status'), + ), + ) + operation = "certificate status" + + + def execute(self, request_id, **kw): + ca_enabled_check() + self.check_access() + return dict( + result=self.Backend.ra.check_request_status(request_id) + ) + + + +_serial_number = Str('serial_number', + validate_serial_number, + label=_('Serial number'), + doc=_('Serial number in decimal or if prefixed with 0x in hexadecimal'), + normalizer=normalize_serial_number, +) + +@register() +class cert_show(VirtualCommand): + __doc__ = _('Retrieve an existing certificate.') + + takes_args = _serial_number + + has_output_params = ( + Str('certificate', + label=_('Certificate'), + ), + Str('subject', + label=_('Subject'), + ), + Str('issuer', + label=_('Issuer'), + ), + Str('valid_not_before', + label=_('Not Before'), + ), + Str('valid_not_after', + label=_('Not After'), + ), + Str('md5_fingerprint', + label=_('Fingerprint (MD5)'), + ), + Str('sha1_fingerprint', + label=_('Fingerprint (SHA1)'), + ), + Str('revocation_reason', + label=_('Revocation reason'), + ), + Str('serial_number_hex', + label=_('Serial number (hex)'), + ), + ) + + takes_options = ( + Str('out?', + label=_('Output filename'), + doc=_('File to store the certificate in.'), + exclude='webui', + ), + ) + + operation="retrieve certificate" + + def execute(self, serial_number, **options): + ca_enabled_check() + hostname = None + try: + self.check_access() + except errors.ACIError as acierr: + self.debug("Not granted by ACI to retrieve certificate, looking at principal") + bind_principal = getattr(context, 'principal') + if not bind_principal.startswith('host/'): + raise acierr + hostname = get_host_from_principal(bind_principal) + + result=self.Backend.ra.get_certificate(serial_number) + cert = x509.load_certificate(result['certificate']) + result['subject'] = unicode(cert.subject) + result['issuer'] = unicode(cert.issuer) + result['valid_not_before'] = unicode(cert.valid_not_before_str) + result['valid_not_after'] = unicode(cert.valid_not_after_str) + result['md5_fingerprint'] = unicode(nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0]) + result['sha1_fingerprint'] = unicode(nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0]) + if hostname: + # If we have a hostname we want to verify that the subject + # of the certificate matches it, otherwise raise an error + if hostname != cert.subject.common_name: #pylint: disable=E1101 + raise acierr + + return dict(result=result) + + + + +@register() +class cert_revoke(VirtualCommand): + __doc__ = _('Revoke a certificate.') + + takes_args = _serial_number + + has_output_params = ( + Flag('revoked', + label=_('Revoked'), + ), + ) + operation = "revoke certificate" + + # FIXME: The default is 0. Is this really an Int param? + takes_options = ( + Int('revocation_reason', + label=_('Reason'), + doc=_('Reason for revoking the certificate (0-10). Type ' + '"ipa help cert" for revocation reason details. '), + minvalue=0, + maxvalue=10, + default=0, + autofill=True + ), + ) + + def execute(self, serial_number, **kw): + ca_enabled_check() + hostname = None + try: + self.check_access() + except errors.ACIError as acierr: + self.debug("Not granted by ACI to revoke certificate, looking at principal") + try: + # Let cert_show() handle verifying that the subject of the + # cert we're dealing with matches the hostname in the principal + result = api.Command['cert_show'](unicode(serial_number))['result'] + except errors.NotImplementedError: + pass + revocation_reason = kw['revocation_reason'] + if revocation_reason == 7: + raise errors.CertificateOperationError(error=_('7 is not a valid revocation reason')) + return dict( + result=self.Backend.ra.revoke_certificate( + serial_number, revocation_reason=revocation_reason) + ) + + + +@register() +class cert_remove_hold(VirtualCommand): + __doc__ = _('Take a revoked certificate off hold.') + + takes_args = _serial_number + + has_output_params = ( + Flag('unrevoked', + label=_('Unrevoked'), + ), + Str('error_string', + label=_('Error'), + ), + ) + operation = "certificate remove hold" + + def execute(self, serial_number, **kw): + ca_enabled_check() + self.check_access() + return dict( + result=self.Backend.ra.take_certificate_off_hold(serial_number) + ) + + + +@register() +class cert_find(Command): + __doc__ = _('Search for existing certificates.') + + takes_options = ( + Str('subject?', + label=_('Subject'), + doc=_('Subject'), + autofill=False, + ), + Int('revocation_reason?', + label=_('Reason'), + doc=_('Reason for revoking the certificate (0-10). Type ' + '"ipa help cert" for revocation reason details.'), + minvalue=0, + maxvalue=10, + autofill=False, + ), + Int('min_serial_number?', + doc=_("minimum serial number"), + autofill=False, + minvalue=0, + maxvalue=2147483647, + ), + Int('max_serial_number?', + doc=_("maximum serial number"), + autofill=False, + minvalue=0, + maxvalue=2147483647, + ), + Flag('exactly?', + doc=_('match the common name exactly'), + autofill=False, + ), + Str('validnotafter_from?', validate_pkidate, + doc=_('Valid not after from this date (YYYY-mm-dd)'), + autofill=False, + ), + Str('validnotafter_to?', validate_pkidate, + doc=_('Valid not after to this date (YYYY-mm-dd)'), + autofill=False, + ), + Str('validnotbefore_from?', validate_pkidate, + doc=_('Valid not before from this date (YYYY-mm-dd)'), + autofill=False, + ), + Str('validnotbefore_to?', validate_pkidate, + doc=_('Valid not before to this date (YYYY-mm-dd)'), + autofill=False, + ), + Str('issuedon_from?', validate_pkidate, + doc=_('Issued on from this date (YYYY-mm-dd)'), + autofill=False, + ), + Str('issuedon_to?', validate_pkidate, + doc=_('Issued on to this date (YYYY-mm-dd)'), + autofill=False, + ), + Str('revokedon_from?', validate_pkidate, + doc=_('Revoked on from this date (YYYY-mm-dd)'), + autofill=False, + ), + Str('revokedon_to?', validate_pkidate, + doc=_('Revoked on to this date (YYYY-mm-dd)'), + autofill=False, + ), + Int('sizelimit?', + label=_('Size Limit'), + doc=_('Maximum number of certs returned'), + flags=['no_display'], + minvalue=0, + default=100, + ), + ) + + has_output = output.standard_list_of_entries + has_output_params = ( + Str('serial_number_hex', + label=_('Serial number (hex)'), + ), + Str('serial_number', + label=_('Serial number'), + ), + Str('status', + label=_('Status'), + ), + ) + + msg_summary = ngettext( + '%(count)d certificate matched', '%(count)d certificates matched', 0 + ) + + def execute(self, **options): + ca_enabled_check() + ret = dict( + result=self.Backend.ra.find(options) + ) + ret['count'] = len(ret['result']) + ret['truncated'] = False + return ret + + +@register() +class ca_is_enabled(Command): + """ + Checks if any of the servers has the CA service enabled. + """ + NO_CLI = True + has_output = output.standard_value + + def execute(self, *args, **options): + base_dn = DN(('cn', 'masters'), ('cn', 'ipa'), ('cn', 'etc'), + self.api.env.basedn) + filter = '(&(objectClass=ipaConfigObject)(cn=CA))' + try: + self.api.Backend.ldap2.find_entries( + base_dn=base_dn, filter=filter, attrs_list=[]) + except errors.NotFound: + result = False + else: + result = True + return dict(result=result, value=pkey_to_value(None, options)) |