summaryrefslogtreecommitdiffstats
path: root/ipalib/pkcs10.py
diff options
context:
space:
mode:
Diffstat (limited to 'ipalib/pkcs10.py')
-rw-r--r--ipalib/pkcs10.py426
1 files changed, 40 insertions, 386 deletions
diff --git a/ipalib/pkcs10.py b/ipalib/pkcs10.py
index 9119d12e2..3011c1f7a 100644
--- a/ipalib/pkcs10.py
+++ b/ipalib/pkcs10.py
@@ -1,7 +1,7 @@
# Authors:
# Rob Crittenden <rcritten@redhat.com>
#
-# Copyright (C) 2009 Red Hat
+# Copyright (C) 2010 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
@@ -17,356 +17,34 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-# Read PKCS#10 certificate requests (see RFC 2986 and 5280)
+import os
+import sys
+import base64
+import nss.nss as nss
+from ipapython import ipautil
+from ipalib import api
-# NOTE: Not every extension is currently handled. Known to now work:
-# 2.5.29.37 - extKeyUsage
+PEM = 0
+DER = 1
-import sys, string, base64
-from pyasn1.type import base,tag,namedtype,namedval,univ,constraint,char,useful
-from pyasn1.codec.der import decoder, encoder
-from pyasn1 import error
-import copy
-
-# Common OIDs found in a subject
-oidtable = { "2.5.4.3": "CN",
- "2.5.4.6": "C",
- "2.5.4.7": "L",
- "2.5.4.8": "ST",
- "2.5.4.10": "O",
- "2.5.4.11": "OU",
- "1.2.840.113549.1.9.1": "E",
- "0.9.2342.19200300.100.1.25": "DC",
- }
-
-# Some useful OIDs
-FRIENDLYNAME = '1.2.840.113549.1.9.20'
-EXTENSIONREQUEST = '1.2.840.113549.1.9.14'
-
-MAX = 32 # from mozilla/security/nss/lib/util/secasn1t.h
-
-class DirectoryString(univ.Choice):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('teletexString', char.TeletexString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
- namedtype.NamedType('printableString', char.PrintableString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
- namedtype.NamedType('universalString', char.UniversalString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
- namedtype.NamedType('utf8String', char.UTF8String().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
- namedtype.NamedType('bmpString', char.BMPString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
- )
-
-class AttributeValue(DirectoryString): pass
-
-class AttributeType(univ.ObjectIdentifier): pass
-
-class AttributeTypeAndValue(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('type', AttributeType()),
- namedtype.NamedType('value', AttributeValue()) # FIXME, could be any type
- )
-
-class KeyPurposeId(univ.ObjectIdentifier): pass
-
-class ExtKeyUsageSyntax(univ.SequenceOf):
- componentType = KeyPurposeId()
-
-class UPN(char.UTF8String):
- tagSet = char.UTF8String.tagSet.tagExplicitly(
- tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)
- )
-
-class AttributeValueSet(univ.SetOf):
- componentType = univ.Any()
- sizeSpec = univ.SetOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX)
-
-class Attribute(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('type', AttributeType()),
- namedtype.NamedType('values', AttributeValueSet()),
- )
-
-class Attributes(univ.SetOf):
- componentType = Attribute()
-
-class RelativeDistinguishedName(univ.SetOf):
- componentType = AttributeTypeAndValue()
-
-class RDNSequence(univ.SequenceOf):
- componentType = RelativeDistinguishedName()
-
-class Name(univ.Choice):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('', RDNSequence())
- )
-
- def get_components(self):
- components = self.getComponentByPosition(0)
- complist = []
- for idx in range(len(components)):
- attrandvalue = components[idx].getComponentByPosition(0)
- oid = attrandvalue.getComponentByPosition(0)
- # FIXME, should handle any string type
- value = attrandvalue.getComponentByPosition(1).getComponentByType(char.PrintableString.tagSet)
- if value is None:
- value = attrandvalue.getComponentByPosition(1).getComponentByType(char.UTF8String.tagSet)
- if value is None:
- value = attrandvalue.getComponentByPosition(1).getComponentByType(char.IA5String.tagSet)
- vout = value.prettyOut(value).decode('utf-8')
- oidout = oid.prettyOut(oid).decode('utf-8')
- c = ((oidtable.get(oidout, oidout), vout))
- complist.append(c)
-
- return tuple(complist)
-
-class AnotherName(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('type-id', univ.ObjectIdentifier()),
- namedtype.NamedType('value', univ.Any())
- )
-
-class rfc822Name(char.IA5String):
- tagSet = char.IA5String.tagSet.tagImplicitly(
- tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1)
- )
-
-class dNSName(char.IA5String):
- tagSet = char.IA5String.tagSet.tagImplicitly(
- tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2)
- )
-
-class x400Address(univ.OctetString):
- tagSet = univ.OctetString.tagSet.tagImplicitly(
- tag.Tag(tag.tagClassUniversal, tag.tagFormatSimple, 3)
- )
-
-class directoryName(Name):
- tagSet = Name.tagSet.tagImplicitly(
- tag.Tag(tag.tagClassUniversal, tag.tagFormatSimple, 4)
- )
-
-class uniformResourceIdentifier(char.IA5String):
- tagSet = char.IA5String.tagSet.tagImplicitly(
- tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 6)
- )
-
-# Not all general types are handled, nor are these necessarily done
-# per the specification.
-class GeneralName(univ.Choice):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('otherName', AnotherName().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))),
- namedtype.NamedType('rfc822Name', rfc822Name()), #1
- namedtype.NamedType('dNSName', dNSName()), #2
- namedtype.NamedType('x400Address', x400Address()), #3
- namedtype.NamedType('directoryName', directoryName()), #4
- # 5 FIXME
- namedtype.NamedType('uniformResourceIdentifier', uniformResourceIdentifier()), #6
-# namedtype.NamedType('uniformResourceIdentifier', char.IA5String(tagSet=char.IA5String.tagSet.tagImplicitly(tag.Tag(tag.tagClassUniversal, tag.tagFormatSimple, 6)))),
- )
-
-class GeneralNames(univ.SequenceOf):
- componentType = GeneralName()
- sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX)
-
-class SubjectAltName(univ.SequenceOf):
- componentType = GeneralName()
- sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX)
-
-class DistributionPointName(univ.Choice):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('fullName', GeneralNames().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))),
- namedtype.NamedType('nameRelativeToCRLIssuer', RelativeDistinguishedName().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))),
- )
-
-class DistributionPoint(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.OptionalNamedType('distributionPoint', DistributionPointName().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))),
- namedtype.OptionalNamedType('reasons', univ.BitString().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))), # FIXME
- namedtype.OptionalNamedType('cRLIssuer', GeneralNames().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))),
- )
-
-class cRLDistributionPoints(univ.SequenceOf):
- componentType = DistributionPoint()
- sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX)
-
-class basicConstraints(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.DefaultedNamedType('cA', univ.Boolean('False')),
- namedtype.OptionalNamedType('pathLenConstraint', univ.Integer()),
- )
-
-class AlgorithmIdentifier(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('algorithm', univ.ObjectIdentifier()),
- namedtype.OptionalNamedType('parameters', univ.Any())
- )
-
-class SubjectPublicKeyInfo(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('algorithm', AlgorithmIdentifier()),
- namedtype.NamedType('subjectPublicKey', univ.BitString())
- )
-
-class Version(univ.Integer): pass
-
-class CertificationRequestInfo(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('version', Version()),
- namedtype.NamedType('subject', Name()),
- namedtype.NamedType('subjectPublicKeyInfo', SubjectPublicKeyInfo()),
- namedtype.OptionalNamedType('attributes', Attributes().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0)))
- )
-
-class CertificationRequest(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('certificationRequestInfo', CertificationRequestInfo()),
- namedtype.NamedType('signatureAlgorithm', AlgorithmIdentifier()),
- namedtype.NamedType('signatureValue', univ.BitString())
- )
-
- def get_version(self):
- info = self.getComponentByName('certificationRequestInfo')
- version = info.getComponentByName('version')
- return version._value
-
- def get_subject(self):
- info = self.getComponentByName('certificationRequestInfo')
- return info.getComponentByName('subject')
-
- def get_subjectaltname(self):
- attrs = self.get_attributes()
- attrdict = dict(attrs)
- if EXTENSIONREQUEST in attrdict:
- # Extensions are a 3 position tuple
- for ext in attrdict[EXTENSIONREQUEST]:
- if ext[0] == '2.5.29.17':
- # alt name is in the dNSName position
- return ext[2][2]
-
- def get_attributes(self):
- info = self.getComponentByName('certificationRequestInfo')
- attrs = info.getComponentByName('attributes')
- attributes = []
-
- for idx in range(len(attrs)):
- atype = attrs[idx].getComponentByPosition(0)
- aval = attrs[idx].getComponentByPosition(1)
-
- # The attribute list is of type Any, need to re-encode
- aenc = encoder.encode(aval, maxChunkSize=1024)
- decoded = decoder.decode(aenc)[0]
- oid = atype.prettyOut(atype)
-
- if oid == "1.2.840.113549.1.9.20": # PKCS#9 Friendly Name
- value = decoded.getComponentByPosition(0)
- t = (oid, value.prettyOut(value).decode('utf-8'))
- attributes.append(t)
- elif oid == "1.2.840.113549.1.9.14": # PKCS#9 Extension Req
- extensions = []
- extlist = decoded.getComponentByPosition(0)
- for jdx in range(len(extlist)):
- ext = extlist.getComponentByPosition(jdx)
- # An extension has 3 elements:
- # oid
- # bool - critical
- # value
- if len(ext) == 2: # If no critical, default to False
- extoid = atype.prettyOut(ext.getComponentByPosition(0))
- critical = False
- extvalue = ext.getComponentByPosition(1)
- else:
- extoid = atype.prettyOut(ext.getComponentByPosition(0))
- critical = bool(ext.getComponentByPosition(1)._value)
- extvalue = ext.getComponentByPosition(2)
-
- if extoid == '2.5.29.19': # basicConstraints
- extdecoded = decoder.decode(extvalue._value, asn1Spec=basicConstraints())[0]
- ca = bool(extdecoded[0])
- if len(extdecoded) == 2: # path length is optional
- pathlen = extdecoded[1]._value
- else:
- pathlen = None
- constraint = (ca, pathlen)
- e = (extoid, critical, constraint)
- extensions.append(e)
- continue
- elif extoid == '2.5.29.31': # cRLDistributionPoints
- extdecoded = decoder.decode(extvalue._value, asn1Spec=cRLDistributionPoints())[0]
- distpoints = []
- for elem in range(len(extdecoded)):
- name = extdecoded[elem]
- # DistributionPoint is position 0
- distpoint = name.getComponentByPosition(0)
- # fullName is position 0
- fullname = distpoint.getComponentByPosition(0)
- for crl in range(len(fullname)):
- # Get the GeneralName, URI type
- uri = fullname.getComponentByPosition(crl).getComponentByPosition(5)
- distpoints.append(uri.prettyOut(uri).decode('utf-8'))
- e = (extoid, critical, tuple(distpoints))
- extensions.append(e)
- continue
-
- # The data is is encoded as "Any". Pull the raw data out
- # and re-decode it using a different specification.
- try:
- extdecoded = decoder.decode(extvalue._value, asn1Spec=GeneralNames())[0]
- except error.PyAsn1Error:
- # I've seen CSRs where this isn't a sequence of names
- # but is a single name, try to handle that too.
- try:
- extdecoded = decoder.decode(extvalue._value, asn1Spec=GeneralName())[0]
- extdecoded = [extdecoded]
- except error.PyAsn1Error, e:
- # skip for now
- generalnames = 9*["Error"]
- e = (extoid, critical, tuple(generalnames))
- extensions.append(e)
- continue
-
- # We now have a list of extensions in the order they
- # are in the request as GeneralNames. We iterate through
- # each of those to get a GeneralName. We then have to
- # iterate through that to find the position set in it.
-
- # Note that not every type will be returned. Those that
- # are handled are returned in a tuple in the position
- # which they are in the request.
- generalnames = 9*[None]
- for elem in range(len(extdecoded)):
- name = extdecoded[elem]
- for n in range(len(name)):
- if name[n] is None:
- continue
- if generalnames[n] is None:
- generalnames[n] = []
- if n == 3: # OctetString
- generalnames[n].append(name[n]._value)
- if n in [1, 2, 6]: # IA5String
- if n == 6 and extoid == "2.5.29.37":
- # Extended key usage
- v = copy.deepcopy(extvalue._value)
- othername = decoder.decode(v, asn1Spec=ExtKeyUsageSyntax())[0]
- keyusage = []
- for l in range(len(othername)):
- keyusage.append(othername[l].prettyOut(othername[l]))
+def get_subjectaltname(request):
+ """
+ Given a CSR return the subjectaltname value, if any.
- generalnames[n] = tuple(keyusage)
- else:
- generalnames[n].append(name[n].prettyOut(name[n]).decode('utf-8'))
- if n == 0: # AnotherName
- nameoid = name[n].getComponentByPosition(0)
- nameoid = nameoid.prettyOut(nameoid)
- val = name[n].getComponentByPosition(1)
- if nameoid == "1.3.6.1.4.1.311.20.2.3": # UPN
- v = copy.deepcopy(val._value)
- othername = decoder.decode(v, asn1Spec=UPN())[0]
- generalnames[0].append(othername.prettyOut(othername).decode('utf-8'))
+ The return value is a tuple of strings or None
+ """
+ for extension in request.extensions:
+ if extension.oid_tag == nss.SEC_OID_X509_SUBJECT_ALT_NAME:
+ return nss.x509_alt_name(extension.value)
+ return None
- e = (extoid, critical, tuple(generalnames))
- extensions.append(e)
- t = (oid, tuple(extensions))
- attributes.append(t)
+def get_subject(request):
+ """
+ Given a CSR return the subject value.
- return tuple(attributes)
+ This returns an nss.DN object.
+ """
+ return request.subject
def strip_header(csr):
"""
@@ -392,50 +70,26 @@ def load_certificate_request(csr):
substrate = base64.b64decode(csr)
- return decoder.decode(substrate, asn1Spec=CertificationRequest())[0]
+ # A fail-safe so we can always read a CSR. python-nss/NSS will segfault
+ # otherwise
+ if not nss.nss_is_initialized():
+ nss.nss_init_nodb()
-if __name__ == '__main__':
- # Read PEM certs from stdin and print them out in plain text
-
- stSpam, stHam, stDump = 0, 1, 2
- state = stSpam
+ return nss.CertificateRequest(substrate)
- for certLine in sys.stdin.readlines():
- certLine = string.strip(certLine)
- if state == stSpam:
- if state == stSpam:
- if certLine == '-----BEGIN NEW CERTIFICATE REQUEST-----':
- certLines = []
- state = stHam
- continue
- if state == stHam:
- if certLine == '-----END NEW CERTIFICATE REQUEST-----':
- state = stDump
- else:
- certLines.append(certLine)
- complist = []
- if state == stDump:
- substrate = ''
- for certLine in certLines:
- substrate = substrate + base64.b64decode(certLine)
+if __name__ == '__main__':
+ nss.nss_init_nodb()
- request = decoder.decode(substrate, asn1Spec=CertificationRequest())[0]
- subject = request.get_subject()
- attrs = request.get_attributes()
- print "Attributes:"
- print attrs
+ # Read PEM request from stdin and print out its components
- print "Subject:"
- complist = subject.get_components()
- print complist
- out=""
- for c in complist:
- out = out + "%s=%s," % (c[0], c[1])
- print out[:-1]
+ csrlines = sys.stdin.readlines()
+ csrlines = fp.readlines()
+ fp.close()
+ csr = ''.join(csrlines)
- print request.get_subjectaltname()
+ csr = load_certificate_request(csr)
- # Re-encode the request just to be sure things are working
- assert encoder.encode(request, maxChunkSize=1024) == substrate, 'cert recode fails'
+ print csr
- state = stSpam
+ print get_subject(csr)
+ print get_subjectaltname(csr)