From ab1667f3c1607a22c6df49ceba58274347bc5826 Mon Sep 17 00:00:00 2001 From: Rob Crittenden Date: Tue, 24 Nov 2009 16:07:44 -0500 Subject: Use pyasn1-based PKCS#10 and X509v3 parsers instead of pyOpenSSL. The pyOpenSSL PKCS#10 parser doesn't support attributes so we can't identify requests with subject alt names. Subject alt names are only allowed if: - the host for the alt name exists in IPA - if binding as host principal, the host is in the services managedBy attr --- ipalib/x509.py | 272 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 272 insertions(+) create mode 100644 ipalib/x509.py (limited to 'ipalib/x509.py') diff --git a/ipalib/x509.py b/ipalib/x509.py new file mode 100644 index 000000000..ee9ceb3e0 --- /dev/null +++ b/ipalib/x509.py @@ -0,0 +1,272 @@ +""" +Imported from pyasn1 project: + +Copyright (c) 2005-2009 Ilya Etingof , all rights reserved. + +THIS SOFTWARE IS NOT FAULT TOLERANT AND SHOULD NOT BE USED IN ANY SITUATION +ENDANGERING HUMAN LIFE OR PROPERTY. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + + * The name of the authors may not be used to endorse or promote products + derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS'' +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF +THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +""" + +""" +Enhancements released under IPA GPLv2 only license +""" + +# Read ASN.1/PEM X.509 certificates on stdin, parse each into plain text, +# then build substrate from it +import sys, string, base64 +from pyasn1.type import tag,namedtype,namedval,univ,constraint,char,useful +from pyasn1.codec.der import decoder, encoder +from pyasn1 import error + +# Would be autogenerated from ASN.1 source by a ASN.1 parser +# X.509 spec (rfc2459) + +# Common OIDs found in a subject +oidtable = { "2.5.4.3": "CN", + "2.5.4.6": "C", + "2.5.4.7": "L", + "2.5.4.8": "ST", + "2.5.4.10": "O", + "2.5.4.11": "OU", + "1.2.840.113549.1.9.1": "E", + "0.9.2342.19200300.100.1.25": "DC", + } + +MAX = 64 # XXX ? + +class DirectoryString(univ.Choice): + componentType = namedtype.NamedTypes( + namedtype.NamedType('teletexString', char.TeletexString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))), + namedtype.NamedType('printableString', char.PrintableString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))), + namedtype.NamedType('universalString', char.UniversalString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))), + namedtype.NamedType('utf8String', char.UTF8String().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))), + namedtype.NamedType('bmpString', char.BMPString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))), + namedtype.NamedType('ia5String', char.IA5String().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))) # hm, this should not be here!? XXX + ) + +class AttributeValue(DirectoryString): pass + +class AttributeType(univ.ObjectIdentifier): pass + +class AttributeTypeAndValue(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('type', AttributeType()), + namedtype.NamedType('value', AttributeValue()) + ) + +class RelativeDistinguishedName(univ.SetOf): + componentType = AttributeTypeAndValue() + +class RDNSequence(univ.SequenceOf): + componentType = RelativeDistinguishedName() + +class Name(univ.Choice): + componentType = namedtype.NamedTypes( + namedtype.NamedType('', RDNSequence()) + ) + + def get_components(self): + components = self.getComponentByPosition(0) + complist = [] + for idx in range(len(components)): + attrandvalue = components[idx].getComponentByPosition(0) + oid = attrandvalue.getComponentByPosition(0) + # FIXME, should handle any string type + value = attrandvalue.getComponentByPosition(1).getComponentByType(char.PrintableString.tagSet) + if value is None: + value = attrandvalue.getComponentByPosition(1).getComponentByType(char.UTF8String.tagSet) + if value is None: + value = attrandvalue.getComponentByPosition(1).getComponentByType(char.IA5String.tagSet) + vout = value.prettyOut(value).decode('utf-8') + oidout = oid.prettyOut(oid).decode('utf-8') + c = ((oidtable.get(oidout, oidout), vout)) + complist.append(c) + + return tuple(complist) + +class AlgorithmIdentifier(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('algorithm', univ.ObjectIdentifier()), + namedtype.OptionalNamedType('parameters', univ.Null()) + # XXX syntax screwed? +# namedtype.OptionalNamedType('parameters', univ.ObjectIdentifier()) + ) + +class Extension(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('extnID', univ.ObjectIdentifier()), + namedtype.DefaultedNamedType('critical', univ.Boolean('False')), + namedtype.NamedType('extnValue', univ.OctetString()) + ) + +class Extensions(univ.SequenceOf): + componentType = Extension() + sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX) + +class SubjectPublicKeyInfo(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('algorithm', AlgorithmIdentifier()), + namedtype.NamedType('subjectPublicKey', univ.BitString()) + ) + +class UniqueIdentifier(univ.BitString): pass + +class Time(univ.Choice): + componentType = namedtype.NamedTypes( + namedtype.NamedType('utcTime', useful.UTCTime()), + namedtype.NamedType('generalTime', useful.GeneralizedTime()) + ) + +class Validity(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('notBefore', Time()), + namedtype.NamedType('notAfter', Time()) + ) + +class CertificateSerialNumber(univ.Integer): pass + +class Version(univ.Integer): + namedValues = namedval.NamedValues( + ('v1', 0), ('v2', 1), ('v3', 2) + ) + +class TBSCertificate(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.DefaultedNamedType('version', Version('v1', tagSet=Version.tagSet.tagExplicitly(tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0)))), + namedtype.NamedType('serialNumber', CertificateSerialNumber()), + namedtype.NamedType('signature', AlgorithmIdentifier()), + namedtype.NamedType('issuer', Name()), + namedtype.NamedType('validity', Validity()), + namedtype.NamedType('subject', Name()), + namedtype.NamedType('subjectPublicKeyInfo', SubjectPublicKeyInfo()), + namedtype.OptionalNamedType('issuerUniqueID', UniqueIdentifier().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))), + namedtype.OptionalNamedType('subjectUniqueID', UniqueIdentifier().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))), + namedtype.OptionalNamedType('extensions', Extensions().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 3))) + ) + +class Certificate(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType('tbsCertificate', TBSCertificate()), + namedtype.NamedType('signatureAlgorithm', AlgorithmIdentifier()), + namedtype.NamedType('signatureValue', univ.BitString()) + ) + + def get_version(self): + info = self.getComponentByName('tbsCertificate') + version = info.getComponentByName('version') + return version._value + + def get_subject(self): + info = self.getComponentByName('tbsCertificate') + return info.getComponentByName('subject') + + def get_serial_number(self): + info = self.getComponentByName('tbsCertificate') + return info.getComponentByName('serialNumber') + +# end of ASN.1 data structures + +def strip_header(pem): + """ + Remove the header and footer from a certificate. + """ + s = pem.find("-----BEGIN CERTIFICATE-----") + if s >= 0: + e = pem.find("-----END CERTIFICATE-----") + pem = pem[s+27:e] + + return pem + + +def load_certificate(pem): + """ + Given a base64-encoded certificate, with or without the + header/footer, return a request object. + """ + pem = strip_header(pem) + + substrate = base64.b64decode(pem) + + return decoder.decode(substrate, asn1Spec=Certificate())[0] + +def get_subject_components(certificate): + """ + Load an X509.3 certificate and get the subject. + + Return a tuple of a certificate subject. + (('CN', u'www.example.com', ('O', u'IPA')) + """ + + # Grab the subject, reverse it, combine it and return it + x509cert = load_certificate(certificate) + return x509cert.get_subject().get_components() + +def get_serial_number(certificate): + """ + Return the serial number of a certificate. + + Returns an integer + """ + x509cert = load_certificate(certificate) + return x509cert.get_serial_number() + +if __name__ == '__main__': + certType = Certificate() + + # Read PEM certs from stdin and print them out in plain text + + stSpam, stHam, stDump = 0, 1, 2 + state = stSpam + certCnt = 0 + + for certLine in sys.stdin.readlines(): + certLine = string.strip(certLine) + if state == stSpam: + if state == stSpam: + if certLine == '-----BEGIN CERTIFICATE-----': + certLines = [] + state = stHam + continue + if state == stHam: + if certLine == '-----END CERTIFICATE-----': + state = stDump + else: + certLines.append(certLine) + if state == stDump: + substrate = '' + for certLine in certLines: + substrate = substrate + base64.b64decode(certLine) + + cert = decoder.decode(substrate, asn1Spec=certType)[0] + print cert.prettyPrint() + + assert encoder.encode(cert) == substrate, 'cert recode fails' + + certCnt = certCnt + 1 + state = stSpam + + print '*** %s PEM cert(s) de/serialized' % certCnt -- cgit