diff options
author | Rob Crittenden <rcritten@redhat.com> | 2009-11-24 16:07:44 -0500 |
---|---|---|
committer | Jason Gerard DeRose <jderose@redhat.com> | 2009-11-30 18:10:09 -0700 |
commit | ab1667f3c1607a22c6df49ceba58274347bc5826 (patch) | |
tree | bc2e6102d3d9cd103d2418ad5372e164e0e7533d /ipalib | |
parent | 7c2c2d6130648fb6dd7c0e52d802cc6eff39ef95 (diff) | |
download | freeipa-ab1667f3c1607a22c6df49ceba58274347bc5826.tar.gz freeipa-ab1667f3c1607a22c6df49ceba58274347bc5826.tar.xz freeipa-ab1667f3c1607a22c6df49ceba58274347bc5826.zip |
Use pyasn1-based PKCS#10 and X509v3 parsers instead of pyOpenSSL.
The pyOpenSSL PKCS#10 parser doesn't support attributes so we can't identify
requests with subject alt names.
Subject alt names are only allowed if:
- the host for the alt name exists in IPA
- if binding as host principal, the host is in the services managedBy attr
Diffstat (limited to 'ipalib')
-rw-r--r-- | ipalib/pkcs10.py | 439 | ||||
-rw-r--r-- | ipalib/plugins/cert.py | 90 | ||||
-rw-r--r-- | ipalib/plugins/service.py | 10 | ||||
-rw-r--r-- | ipalib/x509.py | 272 |
4 files changed, 784 insertions, 27 deletions
diff --git a/ipalib/pkcs10.py b/ipalib/pkcs10.py new file mode 100644 index 00000000..f3f82c40 --- /dev/null +++ b/ipalib/pkcs10.py @@ -0,0 +1,439 @@ +# Authors: +# Rob Crittenden <rcritten@redhat.com> +# +# Copyright (C) 2009 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +# Read PKCS#10 certificate requests (see RFC 2986 and 5280) + +# NOTE: Not every extension is currently handled. Known to now work: +# 2.5.29.37 - extKeyUsage + +import sys, string, base64 +from pyasn1.type import base,tag,namedtype,namedval,univ,constraint,char,useful +from pyasn1.codec.der import decoder, encoder +from pyasn1 import error +import copy + +# Common OIDs found in a subject +oidtable = { "2.5.4.3": "CN", + "2.5.4.6": "C", + "2.5.4.7": "L", + "2.5.4.8": "ST", + "2.5.4.10": "O", + "2.5.4.11": "OU", + "1.2.840.113549.1.9.1": "E", + "0.9.2342.19200300.100.1.25": "DC", + } + +# Some useful OIDs +FRIENDLYNAME = '1.2.840.113549.1.9.20' +EXTENSIONREQUEST = '1.2.840.113549.1.9.14' + +MAX = 32 # from mozilla/security/nss/lib/util/secasn1t.h + +class DirectoryString(univ.Choice): + componentType = namedtype.NamedTypes( + namedtype.NamedType('teletexString', char.TeletexString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))), + namedtype.NamedType('printableString', char.PrintableString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))), + namedtype.NamedType('universalString', char.UniversalString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))), + namedtype.NamedType('utf8String', char.UTF8String().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))), + namedtype.NamedType('bmpString', char.BMPString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))), + ) + +class AttributeValue(DirectoryString): pass + +class AttributeType(univ.ObjectIdentifier): pass + +class AttributeTypeAndValue(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('type', AttributeType()), + namedtype.NamedType('value', AttributeValue()) # FIXME, could be any type + ) + +class KeyPurposeId(univ.ObjectIdentifier): pass + +class ExtKeyUsageSyntax(univ.SequenceOf): + componentType = KeyPurposeId() + +class UPN(char.UTF8String): + tagSet = char.UTF8String.tagSet.tagExplicitly( + tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0) + ) + +class AttributeValueSet(univ.SetOf): + componentType = univ.Any() + sizeSpec = univ.SetOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX) + +class Attribute(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('type', AttributeType()), + namedtype.NamedType('values', AttributeValueSet()), + ) + +class Attributes(univ.SetOf): + componentType = Attribute() + +class RelativeDistinguishedName(univ.SetOf): + componentType = AttributeTypeAndValue() + +class RDNSequence(univ.SequenceOf): + componentType = RelativeDistinguishedName() + +class Name(univ.Choice): + componentType = namedtype.NamedTypes( + namedtype.NamedType('', RDNSequence()) + ) + + def get_components(self): + components = self.getComponentByPosition(0) + complist = [] + for idx in range(len(components)): + attrandvalue = components[idx].getComponentByPosition(0) + oid = attrandvalue.getComponentByPosition(0) + # FIXME, should handle any string type + value = attrandvalue.getComponentByPosition(1).getComponentByType(char.PrintableString.tagSet) + if value is None: + value = attrandvalue.getComponentByPosition(1).getComponentByType(char.UTF8String.tagSet) + if value is None: + value = attrandvalue.getComponentByPosition(1).getComponentByType(char.IA5String.tagSet) + vout = value.prettyOut(value).decode('utf-8') + oidout = oid.prettyOut(oid).decode('utf-8') + c = ((oidtable.get(oidout, oidout), vout)) + complist.append(c) + + return tuple(complist) + +class AnotherName(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('type-id', univ.ObjectIdentifier()), + namedtype.NamedType('value', univ.Any()) + ) + +class rfc822Name(char.IA5String): + tagSet = char.IA5String.tagSet.tagImplicitly( + tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1) + ) + +class dNSName(char.IA5String): + tagSet = char.IA5String.tagSet.tagImplicitly( + tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2) + ) + +class x400Address(univ.OctetString): + tagSet = univ.OctetString.tagSet.tagImplicitly( + tag.Tag(tag.tagClassUniversal, tag.tagFormatSimple, 3) + ) + +class directoryName(Name): + tagSet = Name.tagSet.tagImplicitly( + tag.Tag(tag.tagClassUniversal, tag.tagFormatSimple, 4) + ) + +class uniformResourceIdentifier(char.IA5String): + tagSet = char.IA5String.tagSet.tagImplicitly( + tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 6) + ) + +# Not all general types are handled, nor are these necessarily done +# per the specification. +class GeneralName(univ.Choice): + componentType = namedtype.NamedTypes( + namedtype.NamedType('otherName', AnotherName().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))), + namedtype.NamedType('rfc822Name', rfc822Name()), #1 + namedtype.NamedType('dNSName', dNSName()), #2 + namedtype.NamedType('x400Address', x400Address()), #3 + namedtype.NamedType('directoryName', directoryName()), #4 + # 5 FIXME + namedtype.NamedType('uniformResourceIdentifier', uniformResourceIdentifier()), #6 +# namedtype.NamedType('uniformResourceIdentifier', char.IA5String(tagSet=char.IA5String.tagSet.tagImplicitly(tag.Tag(tag.tagClassUniversal, tag.tagFormatSimple, 6)))), + ) + +class GeneralNames(univ.SequenceOf): + componentType = GeneralName() + sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX) + +class SubjectAltName(univ.SequenceOf): + componentType = GeneralName() + sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX) + +class DistributionPointName(univ.Choice): + componentType = namedtype.NamedTypes( + namedtype.NamedType('fullName', GeneralNames().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))), + namedtype.NamedType('nameRelativeToCRLIssuer', RelativeDistinguishedName().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))), + ) + +class DistributionPoint(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.OptionalNamedType('distributionPoint', DistributionPointName().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))), + namedtype.OptionalNamedType('reasons', univ.BitString().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))), # FIXME + namedtype.OptionalNamedType('cRLIssuer', GeneralNames().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))), + ) + +class cRLDistributionPoints(univ.SequenceOf): + componentType = DistributionPoint() + sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX) + +class basicConstraints(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.DefaultedNamedType('cA', univ.Boolean('False')), + namedtype.OptionalNamedType('pathLenConstraint', univ.Integer()), + ) + +class AlgorithmIdentifier(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('algorithm', univ.ObjectIdentifier()), + namedtype.OptionalNamedType('parameters', univ.Any()) + ) + +class SubjectPublicKeyInfo(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('algorithm', AlgorithmIdentifier()), + namedtype.NamedType('subjectPublicKey', univ.BitString()) + ) + +class Version(univ.Integer): pass + +class CertificationRequestInfo(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('version', Version()), + namedtype.NamedType('subject', Name()), + namedtype.NamedType('subjectPublicKeyInfo', SubjectPublicKeyInfo()), + namedtype.OptionalNamedType('attributes', Attributes().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))) + ) + +class CertificationRequest(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('certificationRequestInfo', CertificationRequestInfo()), + namedtype.NamedType('signatureAlgorithm', AlgorithmIdentifier()), + namedtype.NamedType('signatureValue', univ.BitString()) + ) + + def get_version(self): + info = self.getComponentByName('certificationRequestInfo') + version = info.getComponentByName('version') + return version._value + + def get_subject(self): + info = self.getComponentByName('certificationRequestInfo') + return info.getComponentByName('subject') + + def get_subjectaltname(self): + attrs = self.get_attributes() + attrdict = dict(attrs) + if EXTENSIONREQUEST in attrdict: + # Extensions are a 3 position tuple + for ext in attrdict[EXTENSIONREQUEST]: + if ext[0] == '2.5.29.17': + # alt name is in the dNSName position + return ext[2][2] + + def get_attributes(self): + info = self.getComponentByName('certificationRequestInfo') + attrs = info.getComponentByName('attributes') + attributes = [] + + for idx in range(len(attrs)): + atype = attrs[idx].getComponentByPosition(0) + aval = attrs[idx].getComponentByPosition(1) + + # The attribute list is of type Any, need to re-encode + aenc = encoder.encode(aval, maxChunkSize=1024) + decoded = decoder.decode(aenc)[0] + oid = atype.prettyOut(atype) + + if oid == "1.2.840.113549.1.9.20": # PKCS#9 Friendly Name + value = decoded.getComponentByPosition(0) + t = (oid, value.prettyOut(value).decode('utf-8')) + attributes.append(t) + elif oid == "1.2.840.113549.1.9.14": # PKCS#9 Extension Req + extensions = [] + extlist = decoded.getComponentByPosition(0) + for jdx in range(len(extlist)): + ext = extlist.getComponentByPosition(jdx) + # An extension has 3 elements: + # oid + # bool - critical + # value + if len(ext) == 2: # If no critical, default to False + extoid = atype.prettyOut(ext.getComponentByPosition(0)) + critical = False + extvalue = ext.getComponentByPosition(1) + else: + extoid = atype.prettyOut(ext.getComponentByPosition(0)) + critical = bool(ext.getComponentByPosition(1)._value) + extvalue = ext.getComponentByPosition(2) + + if extoid == '2.5.29.19': # basicConstraints + extdecoded = decoder.decode(extvalue._value, asn1Spec=basicConstraints())[0] + ca = bool(extdecoded[0]) + if len(extdecoded) == 2: # path length is optional + pathlen = extdecoded[1]._value + else: + pathlen = None + constraint = (ca, pathlen) + e = (extoid, critical, constraint) + extensions.append(e) + continue + elif extoid == '2.5.29.31': # cRLDistributionPoints + extdecoded = decoder.decode(extvalue._value, asn1Spec=cRLDistributionPoints())[0] + distpoints = [] + for elem in range(len(extdecoded)): + name = extdecoded[elem] + # DistributionPoint is position 0 + distpoint = name.getComponentByPosition(0) + # fullName is position 0 + fullname = distpoint.getComponentByPosition(0) + for crl in range(len(fullname)): + # Get the GeneralName, URI type + uri = fullname.getComponentByPosition(crl).getComponentByPosition(5) + distpoints.append(uri.prettyOut(uri).decode('utf-8')) + e = (extoid, critical, tuple(distpoints)) + extensions.append(e) + continue + + # The data is is encoded as "Any". Pull the raw data out + # and re-decode it using a different specification. + try: + extdecoded = decoder.decode(extvalue._value, asn1Spec=GeneralNames())[0] + except error.PyAsn1Error: + # I've seen CSRs where this isn't a sequence of names + # but is a single name, try to handle that too. + try: + extdecoded = decoder.decode(extvalue._value, asn1Spec=GeneralName())[0] + extdecoded = [extdecoded] + except error.PyAsn1Error, e: + # skip for now + generalnames = 9*["Error"] + e = (extoid, critical, tuple(generalnames)) + extensions.append(e) + continue + + # We now have a list of extensions in the order they + # are in the request as GeneralNames. We iterate through + # each of those to get a GeneralName. We then have to + # iterate through that to find the position set in it. + + # Note that not every type will be returned. Those that + # are handled are returned in a tuple in the position + # which they are in the request. + generalnames = 9*[None] + for elem in range(len(extdecoded)): + name = extdecoded[elem] + for n in range(len(name)): + if name[n] is None: + continue + if generalnames[n] is None: + generalnames[n] = [] + if n == 3: # OctetString + generalnames[n].append(name[n]._value) + if n in [1, 2, 6]: # IA5String + if n == 6 and extoid == "2.5.29.37": + # Extended key usage + v = copy.deepcopy(extvalue._value) + othername = decoder.decode(v, asn1Spec=ExtKeyUsageSyntax())[0] + keyusage = [] + for l in range(len(othername)): + keyusage.append(othername[l].prettyOut(othername[l])) + + generalnames[n] = tuple(keyusage) + else: + generalnames[n].append(name[n].prettyOut(name[n]).decode('utf-8')) + if n == 0: # AnotherName + nameoid = name[n].getComponentByPosition(0) + nameoid = nameoid.prettyOut(nameoid) + val = name[n].getComponentByPosition(1) + if nameoid == "1.3.6.1.4.1.311.20.2.3": # UPN + v = copy.deepcopy(val._value) + othername = decoder.decode(v, asn1Spec=UPN())[0] + generalnames[0].append(othername.prettyOut(othername).decode('utf-8')) + + e = (extoid, critical, tuple(generalnames)) + extensions.append(e) + t = (oid, tuple(extensions)) + attributes.append(t) + + return tuple(attributes) + +def strip_header(csr): + """ + Remove the header and footer from a CSR. + """ + s = csr.find("-----BEGIN NEW CERTIFICATE REQUEST-----") + if s == -1: + s = csr.find("-----BEGIN CERTIFICATE REQUEST-----") + if s >= 0: + e = csr.find("-----END") + csr = csr[s+40:e] + + return csr + +def load_certificate_request(csr): + """ + Given a base64-encoded certificate request, with or without the + header/footer, return a request object. + """ + csr = strip_header(csr) + + substrate = base64.b64decode(csr) + + return decoder.decode(substrate, asn1Spec=CertificationRequest())[0] + +if __name__ == '__main__': + # Read PEM certs from stdin and print them out in plain text + + stSpam, stHam, stDump = 0, 1, 2 + state = stSpam + + for certLine in sys.stdin.readlines(): + certLine = string.strip(certLine) + if state == stSpam: + if state == stSpam: + if certLine == '-----BEGIN NEW CERTIFICATE REQUEST-----': + certLines = [] + state = stHam + continue + if state == stHam: + if certLine == '-----END NEW CERTIFICATE REQUEST-----': + state = stDump + else: + certLines.append(certLine) + complist = [] + if state == stDump: + substrate = '' + for certLine in certLines: + substrate = substrate + base64.b64decode(certLine) + + request = decoder.decode(substrate, asn1Spec=CertificationRequest())[0] + subject = request.get_subject() + attrs = request.get_attributes() + print "Attributes:" + print attrs + + print "Subject:" + complist = subject.get_components() + print complist + out="" + for c in complist: + out = out + "%s=%s," % (c[0], c[1]) + print out[:-1] + + print request.get_subjectaltname() + + # Re-encode the request just to be sure things are working + assert encoder.encode(request, maxChunkSize=1024) == substrate, 'cert recode fails' + + state = stSpam diff --git a/ipalib/plugins/cert.py b/ipalib/plugins/cert.py index 48ceaa6d..ba088dd9 100644 --- a/ipalib/plugins/cert.py +++ b/ipalib/plugins/cert.py @@ -28,12 +28,16 @@ if api.env.enable_ra is not True: raise SkipPluginModule(reason='env.enable_ra is not True') from ipalib import Command, Str, Int, Bytes, Flag, File from ipalib import errors +from ipalib import pkcs10 +from ipalib import x509 from ipalib.plugins.virtual import * from ipalib.plugins.service import split_principal import base64 -from OpenSSL import crypto from ipalib.request import context from ipapython import dnsclient +from pyasn1.error import PyAsn1Error +import logging +import traceback def get_serial(certificate): """ @@ -45,9 +49,8 @@ def get_serial(certificate): if type(certificate) in (list, tuple): certificate = certificate[0] try: - x509 = crypto.load_certificate(crypto.FILETYPE_ASN1, certificate) - serial = str(x509.get_serial_number()) - except crypto.Error: + serial = str(x509.get_serial_number(certificate)) + except PyAsn1Error: raise errors.GenericError(format='Unable to decode certificate in entry') return serial @@ -57,25 +60,49 @@ def get_csr_hostname(csr): Return the value of CN in the subject of the request """ try: - der = base64.b64decode(csr) - request = crypto.load_certificate_request(crypto.FILETYPE_ASN1, der) + request = pkcs10.load_certificate_request(csr) sub = request.get_subject().get_components() for s in sub: if s[0].lower() == "cn": return s[1] - except crypto.Error, e: - raise errors.GenericError(format='Unable to decode CSR: %s' % str(e)) + except PyAsn1Error: + # The ASN.1 decoding errors tend to be long and involved and the + # last bit is generally not interesting. We need the whole traceback. + logging.error('Unable to decode CSR\n%s', traceback.format_exc()) + raise errors.GenericError(format='Failure decoding Certificate Signing Request') return None +def get_subjectaltname(csr): + """ + Return the value of the subject alt name, if any + """ + try: + request = pkcs10.load_certificate_request(csr) + except PyAsn1Error: + # The ASN.1 decoding errors tend to be long and involved and the + # last bit is generally not interesting. We need the whole traceback. + logging.error('Unable to decode CSR\n%s', traceback.format_exc()) + raise errors.GenericError(format='Failure decoding Certificate Signing Request') + return request.get_subjectaltname() + def validate_csr(ugettext, csr): """ - For now just verify that it is properly base64-encoded. + Ensure the CSR is base64-encoded and can be decoded by our PKCS#10 + parser. """ try: - base64.b64decode(csr) - except Exception, e: + request = pkcs10.load_certificate_request(csr) + + # Explicitly request the attributes. This fires off additional + # decoding to get things like the subjectAltName. + attrs = request.get_attributes() + except TypeError, e: raise errors.Base64DecodeError(reason=str(e)) + except PyAsn1Error: + raise errors.GenericError(format='Failure decoding Certificate Signing Request') + except Exception, e: + raise errors.GenericError(format='Failure decoding Certificate Signing Request: %s' % str(e)) class cert_request(VirtualCommand): @@ -107,38 +134,43 @@ class cert_request(VirtualCommand): def execute(self, csr, **kw): ldap = self.api.Backend.ldap2 - skw = {"all": True} principal = kw.get('principal') add = kw.get('add') del kw['principal'] del kw['add'] service = None - # We just want the CSR bits, make sure there is nothing else - s = csr.find("-----BEGIN NEW CERTIFICATE REQUEST-----") - e = csr.find("-----END NEW CERTIFICATE REQUEST-----") - if s >= 0: - csr = csr[s+40:e] + """ + Access control is partially handled by the ACI titled + 'Hosts can modify service userCertificate'. This is for the case + where a machine binds using a host/ prinicpal. It can only do the + request if the target hostname is in the managedBy attribute which + is managed using the add/del member commands. + + Binding with a user principal one needs to be in the request_certs + taskgroup (directly or indirectly via role membership). + """ # Can this user request certs? self.check_access() # FIXME: add support for subject alt name - # Is this cert for this principal? - subject_host = get_csr_hostname(csr) # Ensure that the hostname in the CSR matches the principal + subject_host = get_csr_hostname(csr) (servicename, hostname, realm) = split_principal(principal) if subject_host.lower() != hostname.lower(): raise errors.ACIError(info="hostname in subject of request '%s' does not match principal hostname '%s'" % (subject_host, hostname)) + dn = None + service = None # See if the service exists and punt if it doesn't and we aren't # going to add it try: - (dn, service) = api.Command['service_show'](principal, **skw) + (dn, service) = api.Command['service_show'](principal, all=True, raw=True) if 'usercertificate' in service: # FIXME, what to do here? Do we revoke the old cert? - raise errors.GenericError(format='entry already has a certificate, serial number %s' % get_serial(service['usercertificate'])) + raise errors.GenericError(format='entry already has a certificate, serial number %s' % get_serial(base64.b64encode(service['usercertificate'][0]))) except errors.NotFound, e: if not add: raise errors.NotFound(reason="The service principal for this request doesn't exist.") @@ -151,6 +183,22 @@ class cert_request(VirtualCommand): if not ldap.can_write(dn, "usercertificate"): raise errors.ACIError(info="Insufficient 'write' privilege to the 'userCertificate' attribute of entry '%s'." % dn) + # Validate the subject alt name, if any + subjectaltname = get_subjectaltname(csr) + if subjectaltname is not None: + for name in subjectaltname: + try: + (hostdn, hostentry) = api.Command['host_show'](name, all=True, raw=True) + except errors.NotFound: + # We don't want to issue any certificates referencing + # machines we don't know about. Nothing is stored in this + # host record related to this certificate. + raise errors.NotFound(reason='no host record for subject alt name %s in certificate request' % name) + authprincipal = getattr(context, 'principal') + if authprincipal.startswith("host/"): + if not hostdn in service.get('managedby', []): + raise errors.ACIError(info="Insufficient privilege to create a certificate with subject alt name '%s'." % name) + # Request the certificate result = self.Backend.ra.request_certificate(csr, **kw) diff --git a/ipalib/plugins/service.py b/ipalib/plugins/service.py index 449acbae..c88695e4 100644 --- a/ipalib/plugins/service.py +++ b/ipalib/plugins/service.py @@ -23,11 +23,10 @@ Services (Identity) """ import base64 -from OpenSSL import crypto - from ipalib import api, errors from ipalib import Str, Flag, Bytes from ipalib.plugins.baseldap import * +from ipalib import x509 def get_serial(certificate): @@ -35,8 +34,7 @@ def get_serial(certificate): Given a certificate, return the serial number in that cert. """ try: - x509 = crypto.load_certificate(crypto.FILETYPE_ASN1, certificate) - serial = str(x509.get_serial_number()) + serial = str(x509.get_serial_number(certificate)) except crypto.Error: raise errors.GenericError( format='Unable to decode certificate in entry' @@ -247,7 +245,7 @@ api.register(service_show) class service_add_host(LDAPAddMember): """ - Add members to service. + Add hosts that can manage this service. """ member_attributes = ['managedby'] @@ -256,7 +254,7 @@ api.register(service_add_host) class service_remove_host(LDAPRemoveMember): """ - Remove members from service. + Remove hosts that can manage this service. """ member_attributes = ['managedby'] diff --git a/ipalib/x509.py b/ipalib/x509.py new file mode 100644 index 00000000..ee9ceb3e --- /dev/null +++ b/ipalib/x509.py @@ -0,0 +1,272 @@ +""" +Imported from pyasn1 project: + +Copyright (c) 2005-2009 Ilya Etingof <ilya@glas.net>, all rights reserved. + +THIS SOFTWARE IS NOT FAULT TOLERANT AND SHOULD NOT BE USED IN ANY SITUATION +ENDANGERING HUMAN LIFE OR PROPERTY. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + * The name of the authors may not be used to endorse or promote products + derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS'' +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF +THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +""" + +""" +Enhancements released under IPA GPLv2 only license +""" + +# Read ASN.1/PEM X.509 certificates on stdin, parse each into plain text, +# then build substrate from it +import sys, string, base64 +from pyasn1.type import tag,namedtype,namedval,univ,constraint,char,useful +from pyasn1.codec.der import decoder, encoder +from pyasn1 import error + +# Would be autogenerated from ASN.1 source by a ASN.1 parser +# X.509 spec (rfc2459) + +# Common OIDs found in a subject +oidtable = { "2.5.4.3": "CN", + "2.5.4.6": "C", + "2.5.4.7": "L", + "2.5.4.8": "ST", + "2.5.4.10": "O", + "2.5.4.11": "OU", + "1.2.840.113549.1.9.1": "E", + "0.9.2342.19200300.100.1.25": "DC", + } + +MAX = 64 # XXX ? + +class DirectoryString(univ.Choice): + componentType = namedtype.NamedTypes( + namedtype.NamedType('teletexString', char.TeletexString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))), + namedtype.NamedType('printableString', char.PrintableString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))), + namedtype.NamedType('universalString', char.UniversalString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))), + namedtype.NamedType('utf8String', char.UTF8String().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))), + namedtype.NamedType('bmpString', char.BMPString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))), + namedtype.NamedType('ia5String', char.IA5String().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))) # hm, this should not be here!? XXX + ) + +class AttributeValue(DirectoryString): pass + +class AttributeType(univ.ObjectIdentifier): pass + +class AttributeTypeAndValue(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('type', AttributeType()), + namedtype.NamedType('value', AttributeValue()) + ) + +class RelativeDistinguishedName(univ.SetOf): + componentType = AttributeTypeAndValue() + +class RDNSequence(univ.SequenceOf): + componentType = RelativeDistinguishedName() + +class Name(univ.Choice): + componentType = namedtype.NamedTypes( + namedtype.NamedType('', RDNSequence()) + ) + + def get_components(self): + components = self.getComponentByPosition(0) + complist = [] + for idx in range(len(components)): + attrandvalue = components[idx].getComponentByPosition(0) + oid = attrandvalue.getComponentByPosition(0) + # FIXME, should handle any string type + value = attrandvalue.getComponentByPosition(1).getComponentByType(char.PrintableString.tagSet) + if value is None: + value = attrandvalue.getComponentByPosition(1).getComponentByType(char.UTF8String.tagSet) + if value is None: + value = attrandvalue.getComponentByPosition(1).getComponentByType(char.IA5String.tagSet) + vout = value.prettyOut(value).decode('utf-8') + oidout = oid.prettyOut(oid).decode('utf-8') + c = ((oidtable.get(oidout, oidout), vout)) + complist.append(c) + + return tuple(complist) + +class AlgorithmIdentifier(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('algorithm', univ.ObjectIdentifier()), + namedtype.OptionalNamedType('parameters', univ.Null()) + # XXX syntax screwed? +# namedtype.OptionalNamedType('parameters', univ.ObjectIdentifier()) + ) + +class Extension(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('extnID', univ.ObjectIdentifier()), + namedtype.DefaultedNamedType('critical', univ.Boolean('False')), + namedtype.NamedType('extnValue', univ.OctetString()) + ) + +class Extensions(univ.SequenceOf): + componentType = Extension() + sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX) + +class SubjectPublicKeyInfo(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('algorithm', AlgorithmIdentifier()), + namedtype.NamedType('subjectPublicKey', univ.BitString()) + ) + +class UniqueIdentifier(univ.BitString): pass + +class Time(univ.Choice): + componentType = namedtype.NamedTypes( + namedtype.NamedType('utcTime', useful.UTCTime()), + namedtype.NamedType('generalTime', useful.GeneralizedTime()) + ) + +class Validity(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('notBefore', Time()), + namedtype.NamedType('notAfter', Time()) + ) + +class CertificateSerialNumber(univ.Integer): pass + +class Version(univ.Integer): + namedValues = namedval.NamedValues( + ('v1', 0), ('v2', 1), ('v3', 2) + ) + +class TBSCertificate(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.DefaultedNamedType('version', Version('v1', tagSet=Version.tagSet.tagExplicitly(tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0)))), + namedtype.NamedType('serialNumber', CertificateSerialNumber()), + namedtype.NamedType('signature', AlgorithmIdentifier()), + namedtype.NamedType('issuer', Name()), + namedtype.NamedType('validity', Validity()), + namedtype.NamedType('subject', Name()), + namedtype.NamedType('subjectPublicKeyInfo', SubjectPublicKeyInfo()), + namedtype.OptionalNamedType('issuerUniqueID', UniqueIdentifier().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))), + namedtype.OptionalNamedType('subjectUniqueID', UniqueIdentifier().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))), + namedtype.OptionalNamedType('extensions', Extensions().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 3))) + ) + +class Certificate(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('tbsCertificate', TBSCertificate()), + namedtype.NamedType('signatureAlgorithm', AlgorithmIdentifier()), + namedtype.NamedType('signatureValue', univ.BitString()) + ) + + def get_version(self): + info = self.getComponentByName('tbsCertificate') + version = info.getComponentByName('version') + return version._value + + def get_subject(self): + info = self.getComponentByName('tbsCertificate') + return info.getComponentByName('subject') + + def get_serial_number(self): + info = self.getComponentByName('tbsCertificate') + return info.getComponentByName('serialNumber') + +# end of ASN.1 data structures + +def strip_header(pem): + """ + Remove the header and footer from a certificate. + """ + s = pem.find("-----BEGIN CERTIFICATE-----") + if s >= 0: + e = pem.find("-----END CERTIFICATE-----") + pem = pem[s+27:e] + + return pem + + +def load_certificate(pem): + """ + Given a base64-encoded certificate, with or without the + header/footer, return a request object. + """ + pem = strip_header(pem) + + substrate = base64.b64decode(pem) + + return decoder.decode(substrate, asn1Spec=Certificate())[0] + +def get_subject_components(certificate): + """ + Load an X509.3 certificate and get the subject. + + Return a tuple of a certificate subject. + (('CN', u'www.example.com', ('O', u'IPA')) + """ + + # Grab the subject, reverse it, combine it and return it + x509cert = load_certificate(certificate) + return x509cert.get_subject().get_components() + +def get_serial_number(certificate): + """ + Return the serial number of a certificate. + + Returns an integer + """ + x509cert = load_certificate(certificate) + return x509cert.get_serial_number() + +if __name__ == '__main__': + certType = Certificate() + + # Read PEM certs from stdin and print them out in plain text + + stSpam, stHam, stDump = 0, 1, 2 + state = stSpam + certCnt = 0 + + for certLine in sys.stdin.readlines(): + certLine = string.strip(certLine) + if state == stSpam: + if state == stSpam: + if certLine == '-----BEGIN CERTIFICATE-----': + certLines = [] + state = stHam + continue + if state == stHam: + if certLine == '-----END CERTIFICATE-----': + state = stDump + else: + certLines.append(certLine) + if state == stDump: + substrate = '' + for certLine in certLines: + substrate = substrate + base64.b64decode(certLine) + + cert = decoder.decode(substrate, asn1Spec=certType)[0] + print cert.prettyPrint() + + assert encoder.encode(cert) == substrate, 'cert recode fails' + + certCnt = certCnt + 1 + state = stSpam + + print '*** %s PEM cert(s) de/serialized' % certCnt |