diff options
author | Rob Crittenden <rcritten@redhat.com> | 2009-11-24 16:07:44 -0500 |
---|---|---|
committer | Jason Gerard DeRose <jderose@redhat.com> | 2009-11-30 18:10:09 -0700 |
commit | ab1667f3c1607a22c6df49ceba58274347bc5826 (patch) | |
tree | bc2e6102d3d9cd103d2418ad5372e164e0e7533d | |
parent | 7c2c2d6130648fb6dd7c0e52d802cc6eff39ef95 (diff) | |
download | freeipa-ab1667f3c1607a22c6df49ceba58274347bc5826.tar.gz freeipa-ab1667f3c1607a22c6df49ceba58274347bc5826.tar.xz freeipa-ab1667f3c1607a22c6df49ceba58274347bc5826.zip |
Use pyasn1-based PKCS#10 and X509v3 parsers instead of pyOpenSSL.
The pyOpenSSL PKCS#10 parser doesn't support attributes so we can't identify
requests with subject alt names.
Subject alt names are only allowed if:
- the host for the alt name exists in IPA
- if binding as host principal, the host is in the services managedBy attr
-rwxr-xr-x | install/tools/ipa-server-install | 4 | ||||
-rw-r--r-- | ipalib/pkcs10.py | 439 | ||||
-rw-r--r-- | ipalib/plugins/cert.py | 90 | ||||
-rw-r--r-- | ipalib/plugins/service.py | 10 | ||||
-rw-r--r-- | ipalib/x509.py | 272 | ||||
-rw-r--r-- | ipaserver/plugins/selfsign.py | 19 | ||||
-rw-r--r-- | tests/test_pkcs10/__init__.py | 22 | ||||
-rw-r--r-- | tests/test_pkcs10/test0.csr | 12 | ||||
-rw-r--r-- | tests/test_pkcs10/test1.csr | 13 | ||||
-rw-r--r-- | tests/test_pkcs10/test2.csr | 15 | ||||
-rw-r--r-- | tests/test_pkcs10/test_pkcs10.py | 119 |
11 files changed, 983 insertions, 32 deletions
diff --git a/install/tools/ipa-server-install b/install/tools/ipa-server-install index be525f73d..0b2660f3a 100755 --- a/install/tools/ipa-server-install +++ b/install/tools/ipa-server-install @@ -787,6 +787,10 @@ def main(): service.print_msg("restarting the KDC") krb.restart() + # Restart httpd to pick up the new IPA configuration + service.print_msg("restarting the web server") + http.restart() + # Create a BIND instance bind = bindinstance.BindInstance(fstore, dm_password) bind.setup(host_name, ip_address, realm_name, domain_name, dns_forwarders) diff --git a/ipalib/pkcs10.py b/ipalib/pkcs10.py new file mode 100644 index 000000000..f3f82c40d --- /dev/null +++ b/ipalib/pkcs10.py @@ -0,0 +1,439 @@ +# Authors: +# Rob Crittenden <rcritten@redhat.com> +# +# Copyright (C) 2009 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +# Read PKCS#10 certificate requests (see RFC 2986 and 5280) + +# NOTE: Not every extension is currently handled. Known to now work: +# 2.5.29.37 - extKeyUsage + +import sys, string, base64 +from pyasn1.type import base,tag,namedtype,namedval,univ,constraint,char,useful +from pyasn1.codec.der import decoder, encoder +from pyasn1 import error +import copy + +# Common OIDs found in a subject +oidtable = { "2.5.4.3": "CN", + "2.5.4.6": "C", + "2.5.4.7": "L", + "2.5.4.8": "ST", + "2.5.4.10": "O", + "2.5.4.11": "OU", + "1.2.840.113549.1.9.1": "E", + "0.9.2342.19200300.100.1.25": "DC", + } + +# Some useful OIDs +FRIENDLYNAME = '1.2.840.113549.1.9.20' +EXTENSIONREQUEST = '1.2.840.113549.1.9.14' + +MAX = 32 # from mozilla/security/nss/lib/util/secasn1t.h + +class DirectoryString(univ.Choice): + componentType = namedtype.NamedTypes( + namedtype.NamedType('teletexString', char.TeletexString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))), + namedtype.NamedType('printableString', char.PrintableString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))), + namedtype.NamedType('universalString', char.UniversalString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))), + namedtype.NamedType('utf8String', char.UTF8String().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))), + namedtype.NamedType('bmpString', char.BMPString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))), + ) + +class AttributeValue(DirectoryString): pass + +class AttributeType(univ.ObjectIdentifier): pass + +class AttributeTypeAndValue(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('type', AttributeType()), + namedtype.NamedType('value', AttributeValue()) # FIXME, could be any type + ) + +class KeyPurposeId(univ.ObjectIdentifier): pass + +class ExtKeyUsageSyntax(univ.SequenceOf): + componentType = KeyPurposeId() + +class UPN(char.UTF8String): + tagSet = char.UTF8String.tagSet.tagExplicitly( + tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0) + ) + +class AttributeValueSet(univ.SetOf): + componentType = univ.Any() + sizeSpec = univ.SetOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX) + +class Attribute(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('type', AttributeType()), + namedtype.NamedType('values', AttributeValueSet()), + ) + +class Attributes(univ.SetOf): + componentType = Attribute() + +class RelativeDistinguishedName(univ.SetOf): + componentType = AttributeTypeAndValue() + +class RDNSequence(univ.SequenceOf): + componentType = RelativeDistinguishedName() + +class Name(univ.Choice): + componentType = namedtype.NamedTypes( + namedtype.NamedType('', RDNSequence()) + ) + + def get_components(self): + components = self.getComponentByPosition(0) + complist = [] + for idx in range(len(components)): + attrandvalue = components[idx].getComponentByPosition(0) + oid = attrandvalue.getComponentByPosition(0) + # FIXME, should handle any string type + value = attrandvalue.getComponentByPosition(1).getComponentByType(char.PrintableString.tagSet) + if value is None: + value = attrandvalue.getComponentByPosition(1).getComponentByType(char.UTF8String.tagSet) + if value is None: + value = attrandvalue.getComponentByPosition(1).getComponentByType(char.IA5String.tagSet) + vout = value.prettyOut(value).decode('utf-8') + oidout = oid.prettyOut(oid).decode('utf-8') + c = ((oidtable.get(oidout, oidout), vout)) + complist.append(c) + + return tuple(complist) + +class AnotherName(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('type-id', univ.ObjectIdentifier()), + namedtype.NamedType('value', univ.Any()) + ) + +class rfc822Name(char.IA5String): + tagSet = char.IA5String.tagSet.tagImplicitly( + tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1) + ) + +class dNSName(char.IA5String): + tagSet = char.IA5String.tagSet.tagImplicitly( + tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2) + ) + +class x400Address(univ.OctetString): + tagSet = univ.OctetString.tagSet.tagImplicitly( + tag.Tag(tag.tagClassUniversal, tag.tagFormatSimple, 3) + ) + +class directoryName(Name): + tagSet = Name.tagSet.tagImplicitly( + tag.Tag(tag.tagClassUniversal, tag.tagFormatSimple, 4) + ) + +class uniformResourceIdentifier(char.IA5String): + tagSet = char.IA5String.tagSet.tagImplicitly( + tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 6) + ) + +# Not all general types are handled, nor are these necessarily done +# per the specification. +class GeneralName(univ.Choice): + componentType = namedtype.NamedTypes( + namedtype.NamedType('otherName', AnotherName().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))), + namedtype.NamedType('rfc822Name', rfc822Name()), #1 + namedtype.NamedType('dNSName', dNSName()), #2 + namedtype.NamedType('x400Address', x400Address()), #3 + namedtype.NamedType('directoryName', directoryName()), #4 + # 5 FIXME + namedtype.NamedType('uniformResourceIdentifier', uniformResourceIdentifier()), #6 +# namedtype.NamedType('uniformResourceIdentifier', char.IA5String(tagSet=char.IA5String.tagSet.tagImplicitly(tag.Tag(tag.tagClassUniversal, tag.tagFormatSimple, 6)))), + ) + +class GeneralNames(univ.SequenceOf): + componentType = GeneralName() + sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX) + +class SubjectAltName(univ.SequenceOf): + componentType = GeneralName() + sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX) + +class DistributionPointName(univ.Choice): + componentType = namedtype.NamedTypes( + namedtype.NamedType('fullName', GeneralNames().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))), + namedtype.NamedType('nameRelativeToCRLIssuer', RelativeDistinguishedName().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))), + ) + +class DistributionPoint(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.OptionalNamedType('distributionPoint', DistributionPointName().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))), + namedtype.OptionalNamedType('reasons', univ.BitString().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))), # FIXME + namedtype.OptionalNamedType('cRLIssuer', GeneralNames().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))), + ) + +class cRLDistributionPoints(univ.SequenceOf): + componentType = DistributionPoint() + sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX) + +class basicConstraints(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.DefaultedNamedType('cA', univ.Boolean('False')), + namedtype.OptionalNamedType('pathLenConstraint', univ.Integer()), + ) + +class AlgorithmIdentifier(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('algorithm', univ.ObjectIdentifier()), + namedtype.OptionalNamedType('parameters', univ.Any()) + ) + +class SubjectPublicKeyInfo(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('algorithm', AlgorithmIdentifier()), + namedtype.NamedType('subjectPublicKey', univ.BitString()) + ) + +class Version(univ.Integer): pass + +class CertificationRequestInfo(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('version', Version()), + namedtype.NamedType('subject', Name()), + namedtype.NamedType('subjectPublicKeyInfo', SubjectPublicKeyInfo()), + namedtype.OptionalNamedType('attributes', Attributes().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))) + ) + +class CertificationRequest(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('certificationRequestInfo', CertificationRequestInfo()), + namedtype.NamedType('signatureAlgorithm', AlgorithmIdentifier()), + namedtype.NamedType('signatureValue', univ.BitString()) + ) + + def get_version(self): + info = self.getComponentByName('certificationRequestInfo') + version = info.getComponentByName('version') + return version._value + + def get_subject(self): + info = self.getComponentByName('certificationRequestInfo') + return info.getComponentByName('subject') + + def get_subjectaltname(self): + attrs = self.get_attributes() + attrdict = dict(attrs) + if EXTENSIONREQUEST in attrdict: + # Extensions are a 3 position tuple + for ext in attrdict[EXTENSIONREQUEST]: + if ext[0] == '2.5.29.17': + # alt name is in the dNSName position + return ext[2][2] + + def get_attributes(self): + info = self.getComponentByName('certificationRequestInfo') + attrs = info.getComponentByName('attributes') + attributes = [] + + for idx in range(len(attrs)): + atype = attrs[idx].getComponentByPosition(0) + aval = attrs[idx].getComponentByPosition(1) + + # The attribute list is of type Any, need to re-encode + aenc = encoder.encode(aval, maxChunkSize=1024) + decoded = decoder.decode(aenc)[0] + oid = atype.prettyOut(atype) + + if oid == "1.2.840.113549.1.9.20": # PKCS#9 Friendly Name + value = decoded.getComponentByPosition(0) + t = (oid, value.prettyOut(value).decode('utf-8')) + attributes.append(t) + elif oid == "1.2.840.113549.1.9.14": # PKCS#9 Extension Req + extensions = [] + extlist = decoded.getComponentByPosition(0) + for jdx in range(len(extlist)): + ext = extlist.getComponentByPosition(jdx) + # An extension has 3 elements: + # oid + # bool - critical + # value + if len(ext) == 2: # If no critical, default to False + extoid = atype.prettyOut(ext.getComponentByPosition(0)) + critical = False + extvalue = ext.getComponentByPosition(1) + else: + extoid = atype.prettyOut(ext.getComponentByPosition(0)) + critical = bool(ext.getComponentByPosition(1)._value) + extvalue = ext.getComponentByPosition(2) + + if extoid == '2.5.29.19': # basicConstraints + extdecoded = decoder.decode(extvalue._value, asn1Spec=basicConstraints())[0] + ca = bool(extdecoded[0]) + if len(extdecoded) == 2: # path length is optional + pathlen = extdecoded[1]._value + else: + pathlen = None + constraint = (ca, pathlen) + e = (extoid, critical, constraint) + extensions.append(e) + continue + elif extoid == '2.5.29.31': # cRLDistributionPoints + extdecoded = decoder.decode(extvalue._value, asn1Spec=cRLDistributionPoints())[0] + distpoints = [] + for elem in range(len(extdecoded)): + name = extdecoded[elem] + # DistributionPoint is position 0 + distpoint = name.getComponentByPosition(0) + # fullName is position 0 + fullname = distpoint.getComponentByPosition(0) + for crl in range(len(fullname)): + # Get the GeneralName, URI type + uri = fullname.getComponentByPosition(crl).getComponentByPosition(5) + distpoints.append(uri.prettyOut(uri).decode('utf-8')) + e = (extoid, critical, tuple(distpoints)) + extensions.append(e) + continue + + # The data is is encoded as "Any". Pull the raw data out + # and re-decode it using a different specification. + try: + extdecoded = decoder.decode(extvalue._value, asn1Spec=GeneralNames())[0] + except error.PyAsn1Error: + # I've seen CSRs where this isn't a sequence of names + # but is a single name, try to handle that too. + try: + extdecoded = decoder.decode(extvalue._value, asn1Spec=GeneralName())[0] + extdecoded = [extdecoded] + except error.PyAsn1Error, e: + # skip for now + generalnames = 9*["Error"] + e = (extoid, critical, tuple(generalnames)) + extensions.append(e) + continue + + # We now have a list of extensions in the order they + # are in the request as GeneralNames. We iterate through + # each of those to get a GeneralName. We then have to + # iterate through that to find the position set in it. + + # Note that not every type will be returned. Those that + # are handled are returned in a tuple in the position + # which they are in the request. + generalnames = 9*[None] + for elem in range(len(extdecoded)): + name = extdecoded[elem] + for n in range(len(name)): + if name[n] is None: + continue + if generalnames[n] is None: + generalnames[n] = [] + if n == 3: # OctetString + generalnames[n].append(name[n]._value) + if n in [1, 2, 6]: # IA5String + if n == 6 and extoid == "2.5.29.37": + # Extended key usage + v = copy.deepcopy(extvalue._value) + othername = decoder.decode(v, asn1Spec=ExtKeyUsageSyntax())[0] + keyusage = [] + for l in range(len(othername)): + keyusage.append(othername[l].prettyOut(othername[l])) + + generalnames[n] = tuple(keyusage) + else: + generalnames[n].append(name[n].prettyOut(name[n]).decode('utf-8')) + if n == 0: # AnotherName + nameoid = name[n].getComponentByPosition(0) + nameoid = nameoid.prettyOut(nameoid) + val = name[n].getComponentByPosition(1) + if nameoid == "1.3.6.1.4.1.311.20.2.3": # UPN + v = copy.deepcopy(val._value) + othername = decoder.decode(v, asn1Spec=UPN())[0] + generalnames[0].append(othername.prettyOut(othername).decode('utf-8')) + + e = (extoid, critical, tuple(generalnames)) + extensions.append(e) + t = (oid, tuple(extensions)) + attributes.append(t) + + return tuple(attributes) + +def strip_header(csr): + """ + Remove the header and footer from a CSR. + """ + s = csr.find("-----BEGIN NEW CERTIFICATE REQUEST-----") + if s == -1: + s = csr.find("-----BEGIN CERTIFICATE REQUEST-----") + if s >= 0: + e = csr.find("-----END") + csr = csr[s+40:e] + + return csr + +def load_certificate_request(csr): + """ + Given a base64-encoded certificate request, with or without the + header/footer, return a request object. + """ + csr = strip_header(csr) + + substrate = base64.b64decode(csr) + + return decoder.decode(substrate, asn1Spec=CertificationRequest())[0] + +if __name__ == '__main__': + # Read PEM certs from stdin and print them out in plain text + + stSpam, stHam, stDump = 0, 1, 2 + state = stSpam + + for certLine in sys.stdin.readlines(): + certLine = string.strip(certLine) + if state == stSpam: + if state == stSpam: + if certLine == '-----BEGIN NEW CERTIFICATE REQUEST-----': + certLines = [] + state = stHam + continue + if state == stHam: + if certLine == '-----END NEW CERTIFICATE REQUEST-----': + state = stDump + else: + certLines.append(certLine) + complist = [] + if state == stDump: + substrate = '' + for certLine in certLines: + substrate = substrate + base64.b64decode(certLine) + + request = decoder.decode(substrate, asn1Spec=CertificationRequest())[0] + subject = request.get_subject() + attrs = request.get_attributes() + print "Attributes:" + print attrs + + print "Subject:" + complist = subject.get_components() + print complist + out="" + for c in complist: + out = out + "%s=%s," % (c[0], c[1]) + print out[:-1] + + print request.get_subjectaltname() + + # Re-encode the request just to be sure things are working + assert encoder.encode(request, maxChunkSize=1024) == substrate, 'cert recode fails' + + state = stSpam diff --git a/ipalib/plugins/cert.py b/ipalib/plugins/cert.py index 48ceaa6d7..ba088dd96 100644 --- a/ipalib/plugins/cert.py +++ b/ipalib/plugins/cert.py @@ -28,12 +28,16 @@ if api.env.enable_ra is not True: raise SkipPluginModule(reason='env.enable_ra is not True') from ipalib import Command, Str, Int, Bytes, Flag, File from ipalib import errors +from ipalib import pkcs10 +from ipalib import x509 from ipalib.plugins.virtual import * from ipalib.plugins.service import split_principal import base64 -from OpenSSL import crypto from ipalib.request import context from ipapython import dnsclient +from pyasn1.error import PyAsn1Error +import logging +import traceback def get_serial(certificate): """ @@ -45,9 +49,8 @@ def get_serial(certificate): if type(certificate) in (list, tuple): certificate = certificate[0] try: - x509 = crypto.load_certificate(crypto.FILETYPE_ASN1, certificate) - serial = str(x509.get_serial_number()) - except crypto.Error: + serial = str(x509.get_serial_number(certificate)) + except PyAsn1Error: raise errors.GenericError(format='Unable to decode certificate in entry') return serial @@ -57,25 +60,49 @@ def get_csr_hostname(csr): Return the value of CN in the subject of the request """ try: - der = base64.b64decode(csr) - request = crypto.load_certificate_request(crypto.FILETYPE_ASN1, der) + request = pkcs10.load_certificate_request(csr) sub = request.get_subject().get_components() for s in sub: if s[0].lower() == "cn": return s[1] - except crypto.Error, e: - raise errors.GenericError(format='Unable to decode CSR: %s' % str(e)) + except PyAsn1Error: + # The ASN.1 decoding errors tend to be long and involved and the + # last bit is generally not interesting. We need the whole traceback. + logging.error('Unable to decode CSR\n%s', traceback.format_exc()) + raise errors.GenericError(format='Failure decoding Certificate Signing Request') return None +def get_subjectaltname(csr): + """ + Return the value of the subject alt name, if any + """ + try: + request = pkcs10.load_certificate_request(csr) + except PyAsn1Error: + # The ASN.1 decoding errors tend to be long and involved and the + # last bit is generally not interesting. We need the whole traceback. + logging.error('Unable to decode CSR\n%s', traceback.format_exc()) + raise errors.GenericError(format='Failure decoding Certificate Signing Request') + return request.get_subjectaltname() + def validate_csr(ugettext, csr): """ - For now just verify that it is properly base64-encoded. + Ensure the CSR is base64-encoded and can be decoded by our PKCS#10 + parser. """ try: - base64.b64decode(csr) - except Exception, e: + request = pkcs10.load_certificate_request(csr) + + # Explicitly request the attributes. This fires off additional + # decoding to get things like the subjectAltName. + attrs = request.get_attributes() + except TypeError, e: raise errors.Base64DecodeError(reason=str(e)) + except PyAsn1Error: + raise errors.GenericError(format='Failure decoding Certificate Signing Request') + except Exception, e: + raise errors.GenericError(format='Failure decoding Certificate Signing Request: %s' % str(e)) class cert_request(VirtualCommand): @@ -107,38 +134,43 @@ class cert_request(VirtualCommand): def execute(self, csr, **kw): ldap = self.api.Backend.ldap2 - skw = {"all": True} principal = kw.get('principal') add = kw.get('add') del kw['principal'] del kw['add'] service = None - # We just want the CSR bits, make sure there is nothing else - s = csr.find("-----BEGIN NEW CERTIFICATE REQUEST-----") - e = csr.find("-----END NEW CERTIFICATE REQUEST-----") - if s >= 0: - csr = csr[s+40:e] + """ + Access control is partially handled by the ACI titled + 'Hosts can modify service userCertificate'. This is for the case + where a machine binds using a host/ prinicpal. It can only do the + request if the target hostname is in the managedBy attribute which + is managed using the add/del member commands. + + Binding with a user principal one needs to be in the request_certs + taskgroup (directly or indirectly via role membership). + """ # Can this user request certs? self.check_access() # FIXME: add support for subject alt name - # Is this cert for this principal? - subject_host = get_csr_hostname(csr) # Ensure that the hostname in the CSR matches the principal + subject_host = get_csr_hostname(csr) (servicename, hostname, realm) = split_principal(principal) if subject_host.lower() != hostname.lower(): raise errors.ACIError(info="hostname in subject of request '%s' does not match principal hostname '%s'" % (subject_host, hostname)) + dn = None + service = None # See if the service exists and punt if it doesn't and we aren't # going to add it try: - (dn, service) = api.Command['service_show'](principal, **skw) + (dn, service) = api.Command['service_show'](principal, all=True, raw=True) if 'usercertificate' in service: # FIXME, what to do here? Do we revoke the old cert? - raise errors.GenericError(format='entry already has a certificate, serial number %s' % get_serial(service['usercertificate'])) + raise errors.GenericError(format='entry already has a certificate, serial number %s' % get_serial(base64.b64encode(service['usercertificate'][0]))) except errors.NotFound, e: if not add: raise errors.NotFound(reason="The service principal for this request doesn't exist.") @@ -151,6 +183,22 @@ class cert_request(VirtualCommand): if not ldap.can_write(dn, "usercertificate"): raise errors.ACIError(info="Insufficient 'write' privilege to the 'userCertificate' attribute of entry '%s'." % dn) + # Validate the subject alt name, if any + subjectaltname = get_subjectaltname(csr) + if subjectaltname is not None: + for name in subjectaltname: + try: + (hostdn, hostentry) = api.Command['host_show'](name, all=True, raw=True) + except errors.NotFound: + # We don't want to issue any certificates referencing + # machines we don't know about. Nothing is stored in this + # host record related to this certificate. + raise errors.NotFound(reason='no host record for subject alt name %s in certificate request' % name) + authprincipal = getattr(context, 'principal') + if authprincipal.startswith("host/"): + if not hostdn in service.get('managedby', []): + raise errors.ACIError(info="Insufficient privilege to create a certificate with subject alt name '%s'." % name) + # Request the certificate result = self.Backend.ra.request_certificate(csr, **kw) diff --git a/ipalib/plugins/service.py b/ipalib/plugins/service.py index 449acbaec..c88695e42 100644 --- a/ipalib/plugins/service.py +++ b/ipalib/plugins/service.py @@ -23,11 +23,10 @@ Services (Identity) """ import base64 -from OpenSSL import crypto - from ipalib import api, errors from ipalib import Str, Flag, Bytes from ipalib.plugins.baseldap import * +from ipalib import x509 def get_serial(certificate): @@ -35,8 +34,7 @@ def get_serial(certificate): Given a certificate, return the serial number in that cert. """ try: - x509 = crypto.load_certificate(crypto.FILETYPE_ASN1, certificate) - serial = str(x509.get_serial_number()) + serial = str(x509.get_serial_number(certificate)) except crypto.Error: raise errors.GenericError( format='Unable to decode certificate in entry' @@ -247,7 +245,7 @@ api.register(service_show) class service_add_host(LDAPAddMember): """ - Add members to service. + Add hosts that can manage this service. """ member_attributes = ['managedby'] @@ -256,7 +254,7 @@ api.register(service_add_host) class service_remove_host(LDAPRemoveMember): """ - Remove members from service. + Remove hosts that can manage this service. """ member_attributes = ['managedby'] diff --git a/ipalib/x509.py b/ipalib/x509.py new file mode 100644 index 000000000..ee9ceb3e0 --- /dev/null +++ b/ipalib/x509.py @@ -0,0 +1,272 @@ +""" +Imported from pyasn1 project: + +Copyright (c) 2005-2009 Ilya Etingof <ilya@glas.net>, all rights reserved. + +THIS SOFTWARE IS NOT FAULT TOLERANT AND SHOULD NOT BE USED IN ANY SITUATION +ENDANGERING HUMAN LIFE OR PROPERTY. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + * The name of the authors may not be used to endorse or promote products + derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS'' +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF +THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +""" + +""" +Enhancements released under IPA GPLv2 only license +""" + +# Read ASN.1/PEM X.509 certificates on stdin, parse each into plain text, +# then build substrate from it +import sys, string, base64 +from pyasn1.type import tag,namedtype,namedval,univ,constraint,char,useful +from pyasn1.codec.der import decoder, encoder +from pyasn1 import error + +# Would be autogenerated from ASN.1 source by a ASN.1 parser +# X.509 spec (rfc2459) + +# Common OIDs found in a subject +oidtable = { "2.5.4.3": "CN", + "2.5.4.6": "C", + "2.5.4.7": "L", + "2.5.4.8": "ST", + "2.5.4.10": "O", + "2.5.4.11": "OU", + "1.2.840.113549.1.9.1": "E", + "0.9.2342.19200300.100.1.25": "DC", + } + +MAX = 64 # XXX ? + +class DirectoryString(univ.Choice): + componentType = namedtype.NamedTypes( + namedtype.NamedType('teletexString', char.TeletexString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))), + namedtype.NamedType('printableString', char.PrintableString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))), + namedtype.NamedType('universalString', char.UniversalString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))), + namedtype.NamedType('utf8String', char.UTF8String().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))), + namedtype.NamedType('bmpString', char.BMPString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))), + namedtype.NamedType('ia5String', char.IA5String().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))) # hm, this should not be here!? XXX + ) + +class AttributeValue(DirectoryString): pass + +class AttributeType(univ.ObjectIdentifier): pass + +class AttributeTypeAndValue(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('type', AttributeType()), + namedtype.NamedType('value', AttributeValue()) + ) + +class RelativeDistinguishedName(univ.SetOf): + componentType = AttributeTypeAndValue() + +class RDNSequence(univ.SequenceOf): + componentType = RelativeDistinguishedName() + +class Name(univ.Choice): + componentType = namedtype.NamedTypes( + namedtype.NamedType('', RDNSequence()) + ) + + def get_components(self): + components = self.getComponentByPosition(0) + complist = [] + for idx in range(len(components)): + attrandvalue = components[idx].getComponentByPosition(0) + oid = attrandvalue.getComponentByPosition(0) + # FIXME, should handle any string type + value = attrandvalue.getComponentByPosition(1).getComponentByType(char.PrintableString.tagSet) + if value is None: + value = attrandvalue.getComponentByPosition(1).getComponentByType(char.UTF8String.tagSet) + if value is None: + value = attrandvalue.getComponentByPosition(1).getComponentByType(char.IA5String.tagSet) + vout = value.prettyOut(value).decode('utf-8') + oidout = oid.prettyOut(oid).decode('utf-8') + c = ((oidtable.get(oidout, oidout), vout)) + complist.append(c) + + return tuple(complist) + +class AlgorithmIdentifier(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('algorithm', univ.ObjectIdentifier()), + namedtype.OptionalNamedType('parameters', univ.Null()) + # XXX syntax screwed? +# namedtype.OptionalNamedType('parameters', univ.ObjectIdentifier()) + ) + +class Extension(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('extnID', univ.ObjectIdentifier()), + namedtype.DefaultedNamedType('critical', univ.Boolean('False')), + namedtype.NamedType('extnValue', univ.OctetString()) + ) + +class Extensions(univ.SequenceOf): + componentType = Extension() + sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX) + +class SubjectPublicKeyInfo(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('algorithm', AlgorithmIdentifier()), + namedtype.NamedType('subjectPublicKey', univ.BitString()) + ) + +class UniqueIdentifier(univ.BitString): pass + +class Time(univ.Choice): + componentType = namedtype.NamedTypes( + namedtype.NamedType('utcTime', useful.UTCTime()), + namedtype.NamedType('generalTime', useful.GeneralizedTime()) + ) + +class Validity(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('notBefore', Time()), + namedtype.NamedType('notAfter', Time()) + ) + +class CertificateSerialNumber(univ.Integer): pass + +class Version(univ.Integer): + namedValues = namedval.NamedValues( + ('v1', 0), ('v2', 1), ('v3', 2) + ) + +class TBSCertificate(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.DefaultedNamedType('version', Version('v1', tagSet=Version.tagSet.tagExplicitly(tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0)))), + namedtype.NamedType('serialNumber', CertificateSerialNumber()), + namedtype.NamedType('signature', AlgorithmIdentifier()), + namedtype.NamedType('issuer', Name()), + namedtype.NamedType('validity', Validity()), + namedtype.NamedType('subject', Name()), + namedtype.NamedType('subjectPublicKeyInfo', SubjectPublicKeyInfo()), + namedtype.OptionalNamedType('issuerUniqueID', UniqueIdentifier().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))), + namedtype.OptionalNamedType('subjectUniqueID', UniqueIdentifier().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))), + namedtype.OptionalNamedType('extensions', Extensions().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 3))) + ) + +class Certificate(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('tbsCertificate', TBSCertificate()), + namedtype.NamedType('signatureAlgorithm', AlgorithmIdentifier()), + namedtype.NamedType('signatureValue', univ.BitString()) + ) + + def get_version(self): + info = self.getComponentByName('tbsCertificate') + version = info.getComponentByName('version') + return version._value + + def get_subject(self): + info = self.getComponentByName('tbsCertificate') + return info.getComponentByName('subject') + + def get_serial_number(self): + info = self.getComponentByName('tbsCertificate') + return info.getComponentByName('serialNumber') + +# end of ASN.1 data structures + +def strip_header(pem): + """ + Remove the header and footer from a certificate. + """ + s = pem.find("-----BEGIN CERTIFICATE-----") + if s >= 0: + e = pem.find("-----END CERTIFICATE-----") + pem = pem[s+27:e] + + return pem + + +def load_certificate(pem): + """ + Given a base64-encoded certificate, with or without the + header/footer, return a request object. + """ + pem = strip_header(pem) + + substrate = base64.b64decode(pem) + + return decoder.decode(substrate, asn1Spec=Certificate())[0] + +def get_subject_components(certificate): + """ + Load an X509.3 certificate and get the subject. + + Return a tuple of a certificate subject. + (('CN', u'www.example.com', ('O', u'IPA')) + """ + + # Grab the subject, reverse it, combine it and return it + x509cert = load_certificate(certificate) + return x509cert.get_subject().get_components() + +def get_serial_number(certificate): + """ + Return the serial number of a certificate. + + Returns an integer + """ + x509cert = load_certificate(certificate) + return x509cert.get_serial_number() + +if __name__ == '__main__': + certType = Certificate() + + # Read PEM certs from stdin and print them out in plain text + + stSpam, stHam, stDump = 0, 1, 2 + state = stSpam + certCnt = 0 + + for certLine in sys.stdin.readlines(): + certLine = string.strip(certLine) + if state == stSpam: + if state == stSpam: + if certLine == '-----BEGIN CERTIFICATE-----': + certLines = [] + state = stHam + continue + if state == stHam: + if certLine == '-----END CERTIFICATE-----': + state = stDump + else: + certLines.append(certLine) + if state == stDump: + substrate = '' + for certLine in certLines: + substrate = substrate + base64.b64decode(certLine) + + cert = decoder.decode(substrate, asn1Spec=certType)[0] + print cert.prettyPrint() + + assert encoder.encode(cert) == substrate, 'cert recode fails' + + certCnt = certCnt + 1 + state = stSpam + + print '*** %s PEM cert(s) de/serialized' % certCnt diff --git a/ipaserver/plugins/selfsign.py b/ipaserver/plugins/selfsign.py index 0ba7a7c44..d4b2efcf7 100644 --- a/ipaserver/plugins/selfsign.py +++ b/ipaserver/plugins/selfsign.py @@ -36,12 +36,13 @@ if api.env.ra_plugin != 'selfsign': raise SkipPluginModule(reason='selfsign is not selected as RA plugin, it is %s' % api.env.ra_plugin) from ipalib import Backend from ipalib import errors +from ipalib import x509 import subprocess import os from ipaserver.plugins import rabase from ipaserver.install import certs import tempfile -from OpenSSL import crypto +from pyasn1 import error class ra(rabase.rabase): """ @@ -56,6 +57,15 @@ class ra(rabase.rabase): :param request_type: The request type (defaults to ``'pkcs10'``). """ (csr_fd, csr_name) = tempfile.mkstemp() + + # certutil wants the CSR to have have a header and footer. Add one + # if it isn't there. + s = csr.find('-----BEGIN NEW CERTIFICATE REQUEST-----') + if s == -1: + s = csr.find('-----BEGIN CERTIFICATE REQUEST-----') + if s == -1: + csr = '-----BEGIN NEW CERTIFICATE REQUEST-----\n' + csr + \ + '-----END NEW CERTIFICATE REQUEST-----\n' os.write(csr_fd, csr) os.close(csr_fd) (cert_fd, cert_name) = tempfile.mkstemp() @@ -101,16 +111,15 @@ class ra(rabase.rabase): try: # Grab the subject, reverse it, combine it and return it - x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert) - sub = x509.get_subject().get_components() + sub = list(x509.get_subject_components(cert)) sub.reverse() subject = "" for s in sub: subject = subject + "%s=%s," % (s[0], s[1]) subject = subject[:-1] - serial = x509.get_serial_number() - except crypto.Error, e: + serial = x509.get_serial_number(cert) + except error.PyAsn1Error, e: raise errors.GenericError(format='Unable to decode certificate in entry: %s' % str(e)) # To make it look like dogtag return just the base64 data. diff --git a/tests/test_pkcs10/__init__.py b/tests/test_pkcs10/__init__.py new file mode 100644 index 000000000..cd1209dc8 --- /dev/null +++ b/tests/test_pkcs10/__init__.py @@ -0,0 +1,22 @@ +# Authors: +# Rob Crittenden <rcritten@redhat.com> +# +# Copyright (C) 2009 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" +Sub-package containing unit tests for `pkcs10` package. +""" diff --git a/tests/test_pkcs10/test0.csr b/tests/test_pkcs10/test0.csr new file mode 100644 index 000000000..eadfb70b4 --- /dev/null +++ b/tests/test_pkcs10/test0.csr @@ -0,0 +1,12 @@ +-----BEGIN NEW CERTIFICATE REQUEST----- +MIIBjjCB+AIBADBPMQswCQYDVQQGEwJVUzETMBEGA1UECBMKQ2FsaWZvcm5pYTEQ +MA4GA1UEChMHRXhhbXBsZTEZMBcGA1UEAxMQdGVzdC5leGFtcGxlLmNvbTCBnzAN +BgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAyxsN5dmvyKiw+5nyrcO3a61sivZRg+ja +kyNIyUo+tIUiYwTdpPESAHTWRlk0XhydauAkWfOIN7pR3a5Z+kQw8W7F+DuZze2M +6wRNmN+NTrTlqnKOiMHBXhIM0Qxrx68GDctYqtnKTVT94FvvLl9XYVdUEi2ePTc2 +Nyfr1z66+W0CAwEAAaAAMA0GCSqGSIb3DQEBBQUAA4GBAIf3r+Y6WHrFnttUqDow +9/UCHtCeQlQoJqjjxi5wcjbkGwTgHbx/BPOd/8OVaHElboMXLGaZx+L/eFO6E9Yg +mDOYv3OsibDFGaEhJrU8EnfuFZKnbrGeSC9Hkqrq+3OjqacaPla5N7MHKbfLY377 +ddbOHKzR0sURZ+ro4z3fATW2 +-----END NEW CERTIFICATE REQUEST----- + diff --git a/tests/test_pkcs10/test1.csr b/tests/test_pkcs10/test1.csr new file mode 100644 index 000000000..0dad3ae1e --- /dev/null +++ b/tests/test_pkcs10/test1.csr @@ -0,0 +1,13 @@ +-----BEGIN NEW CERTIFICATE REQUEST----- +MIIBwDCCASkCAQAwTzELMAkGA1UEBhMCVVMxEzARBgNVBAgTCkNhbGlmb3JuaWEx +EDAOBgNVBAoTB0V4YW1wbGUxGTAXBgNVBAMTEHRlc3QuZXhhbXBsZS5jb20wgZ8w +DQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAMK+3uy1CGwek8jutw4UO62YTpkmStlw +cKPEjTER7Ra1a1wyWJTo1mMnPhVia0GODeq8ERPgcIckCVogBu8+gL6g8NevaBNv +ij1XWU08BEQqmoqAkrFiI8EdDckKYrSoXo2cg1fiTGzlG8AWtr5eT0op5jBBo0J6 +qXX5Sf6e+n+nAgMBAAGgMTAvBgkqhkiG9w0BCQ4xIjAgMB4GA1UdEQQXMBWCE3Rl +c3Rsb3cuZXhhbXBsZS5jb20wDQYJKoZIhvcNAQEFBQADgYEAwRDa7ZOaym9mAUH7 +hudbvsRkqXHehgf51uMUq0OC9hQ6vPLWqUMAod05lxn3Tnvq6a/fVK0ybgCH5Ld7 +qpAcUruYdj7YxkFfuBc1dpAK6h94rVsJXFCWIMEZm9Fe7n5RERjhO6h2IRSXBHFz +QIszvqBamm/W1ONKdQSM2g+M4BQ= +-----END NEW CERTIFICATE REQUEST----- + diff --git a/tests/test_pkcs10/test2.csr b/tests/test_pkcs10/test2.csr new file mode 100644 index 000000000..ccc47f890 --- /dev/null +++ b/tests/test_pkcs10/test2.csr @@ -0,0 +1,15 @@ +-----BEGIN NEW CERTIFICATE REQUEST----- +MIICETCCAXoCAQAwTzELMAkGA1UEBhMCVVMxEzARBgNVBAgTCkNhbGlmb3JuaWEx +EDAOBgNVBAoTB0V4YW1wbGUxGTAXBgNVBAMTEHRlc3QuZXhhbXBsZS5jb20wgZ8w +DQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAOXfP8LeiU7g6wLCclgkT1lVskK+Lxm1 +6ijE4LmEQBk5nn2P46im+E/UOgTddbDo5cdJlkoCnqXkO4RkqJckXYDxfI34KL3C +CRFPvOa5Sg02m1x5Rg3boZfS6NciP62lRp0SI+0TCt3F16wYZxMahVIOXjbJ6Lu5 +mGjNn7XaWJhFAgMBAAGggYEwfwYJKoZIhvcNAQkOMXIwcDAeBgNVHREEFzAVghN0 +ZXN0bG93LmV4YW1wbGUuY29tME4GA1UdHwRHMEUwQ6BBoD+GHGh0dHA6Ly9jYS5l +eGFtcGxlLmNvbS9teS5jcmyGH2h0dHA6Ly9vdGhlci5leGFtcGxlLmNvbS9teS5j +cmwwDQYJKoZIhvcNAQEFBQADgYEAkv8pppcgGhX7erJmvg9r2UHrRriuKaOYgKZQ +lf/eBt2N0L2mV4QvCY82H7HWuE+7T3mra9ikfvz0nYkPJQe2gntjZzECE0Jt5LWR +UZOFwX8N6wrX11U2xu0NlvsbjU6siWd6OZjZ1p5/V330lzut/q3CNzaAcW1Fx3wL +sV5SXSw= +-----END NEW CERTIFICATE REQUEST----- + diff --git a/tests/test_pkcs10/test_pkcs10.py b/tests/test_pkcs10/test_pkcs10.py new file mode 100644 index 000000000..66d205b96 --- /dev/null +++ b/tests/test_pkcs10/test_pkcs10.py @@ -0,0 +1,119 @@ +# Authors: +# Rob Crittenden <rcritten@redhat.com> +# +# Copyright (C) 2009 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +""" +Test the `pkcs10.py` module. +""" + +import os +import sys +import nose +from tests.util import raises, PluginTester +from ipalib import pkcs10 +from ipapython import ipautil + +class test_update(object): + """ + Test the PKCS#10 Parser. + """ + + def setUp(self): + if ipautil.file_exists("test0.csr"): + self.testdir="./" + elif ipautil.file_exists("tests/test_pkcs10/test0.csr"): + self.testdir= "./tests/test_pkcs10/" + else: + raise nose.SkipTest("Unable to find test update files") + + def read_file(self, filename): + fp = open(self.testdir + filename, "r") + data = fp.read() + fp.close() + return data + + def test_0(self): + """ + Test simple CSR with no attributes + """ + csr = self.read_file("test0.csr") + request = pkcs10.load_certificate_request(csr) + + attributes = request.get_attributes() + subject = request.get_subject() + components = subject.get_components() + compdict = dict(components) + + assert(attributes == ()) + assert(compdict['CN'] == u'test.example.com') + assert(compdict['ST'] == u'California') + assert(compdict['C'] == u'US') + + def test_1(self): + """ + Test CSR with subject alt name + """ + csr = self.read_file("test1.csr") + request = pkcs10.load_certificate_request(csr) + + attributes = request.get_attributes() + subject = request.get_subject() + components = subject.get_components() + compdict = dict(components) + attrdict = dict(attributes) + + assert(compdict['CN'] == u'test.example.com') + assert(compdict['ST'] == u'California') + assert(compdict['C'] == u'US') + + extensions = attrdict['1.2.840.113549.1.9.14'] + + for ext in range(len(extensions)): + if extensions[ext][0] == '2.5.29.17': + names = extensions[ext][2] + # check the dNSName field + assert(names[2] == [u'testlow.example.com']) + + def test_2(self): + """ + Test CSR with subject alt name and a list of CRL distribution points + """ + csr = self.read_file("test2.csr") + request = pkcs10.load_certificate_request(csr) + + attributes = request.get_attributes() + subject = request.get_subject() + components = subject.get_components() + compdict = dict(components) + attrdict = dict(attributes) + + assert(compdict['CN'] == u'test.example.com') + assert(compdict['ST'] == u'California') + assert(compdict['C'] == u'US') + + extensions = attrdict['1.2.840.113549.1.9.14'] + + for ext in range(len(extensions)): + if extensions[ext][0] == '2.5.29.17': + names = extensions[ext][2] + # check the dNSName field + assert(names[2] == [u'testlow.example.com']) + if extensions[ext][0] == '2.5.29.31': + urls = extensions[ext][2] + assert(len(urls) == 2) + assert(urls[0] == u'http://ca.example.com/my.crl') + assert(urls[1] == u'http://other.example.com/my.crl') |