summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xinstall/tools/ipa-server-install4
-rw-r--r--ipalib/pkcs10.py439
-rw-r--r--ipalib/plugins/cert.py90
-rw-r--r--ipalib/plugins/service.py10
-rw-r--r--ipalib/x509.py272
-rw-r--r--ipaserver/plugins/selfsign.py19
-rw-r--r--tests/test_pkcs10/__init__.py22
-rw-r--r--tests/test_pkcs10/test0.csr12
-rw-r--r--tests/test_pkcs10/test1.csr13
-rw-r--r--tests/test_pkcs10/test2.csr15
-rw-r--r--tests/test_pkcs10/test_pkcs10.py119
11 files changed, 983 insertions, 32 deletions
diff --git a/install/tools/ipa-server-install b/install/tools/ipa-server-install
index be525f73d..0b2660f3a 100755
--- a/install/tools/ipa-server-install
+++ b/install/tools/ipa-server-install
@@ -787,6 +787,10 @@ def main():
service.print_msg("restarting the KDC")
krb.restart()
+ # Restart httpd to pick up the new IPA configuration
+ service.print_msg("restarting the web server")
+ http.restart()
+
# Create a BIND instance
bind = bindinstance.BindInstance(fstore, dm_password)
bind.setup(host_name, ip_address, realm_name, domain_name, dns_forwarders)
diff --git a/ipalib/pkcs10.py b/ipalib/pkcs10.py
new file mode 100644
index 000000000..f3f82c40d
--- /dev/null
+++ b/ipalib/pkcs10.py
@@ -0,0 +1,439 @@
+# Authors:
+# Rob Crittenden <rcritten@redhat.com>
+#
+# Copyright (C) 2009 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+# Read PKCS#10 certificate requests (see RFC 2986 and 5280)
+
+# NOTE: Not every extension is currently handled. Known to now work:
+# 2.5.29.37 - extKeyUsage
+
+import sys, string, base64
+from pyasn1.type import base,tag,namedtype,namedval,univ,constraint,char,useful
+from pyasn1.codec.der import decoder, encoder
+from pyasn1 import error
+import copy
+
+# Common OIDs found in a subject
+oidtable = { "2.5.4.3": "CN",
+ "2.5.4.6": "C",
+ "2.5.4.7": "L",
+ "2.5.4.8": "ST",
+ "2.5.4.10": "O",
+ "2.5.4.11": "OU",
+ "1.2.840.113549.1.9.1": "E",
+ "0.9.2342.19200300.100.1.25": "DC",
+ }
+
+# Some useful OIDs
+FRIENDLYNAME = '1.2.840.113549.1.9.20'
+EXTENSIONREQUEST = '1.2.840.113549.1.9.14'
+
+MAX = 32 # from mozilla/security/nss/lib/util/secasn1t.h
+
+class DirectoryString(univ.Choice):
+ componentType = namedtype.NamedTypes(
+ namedtype.NamedType('teletexString', char.TeletexString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
+ namedtype.NamedType('printableString', char.PrintableString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
+ namedtype.NamedType('universalString', char.UniversalString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
+ namedtype.NamedType('utf8String', char.UTF8String().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
+ namedtype.NamedType('bmpString', char.BMPString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
+ )
+
+class AttributeValue(DirectoryString): pass
+
+class AttributeType(univ.ObjectIdentifier): pass
+
+class AttributeTypeAndValue(univ.Sequence):
+ componentType = namedtype.NamedTypes(
+ namedtype.NamedType('type', AttributeType()),
+ namedtype.NamedType('value', AttributeValue()) # FIXME, could be any type
+ )
+
+class KeyPurposeId(univ.ObjectIdentifier): pass
+
+class ExtKeyUsageSyntax(univ.SequenceOf):
+ componentType = KeyPurposeId()
+
+class UPN(char.UTF8String):
+ tagSet = char.UTF8String.tagSet.tagExplicitly(
+ tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)
+ )
+
+class AttributeValueSet(univ.SetOf):
+ componentType = univ.Any()
+ sizeSpec = univ.SetOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX)
+
+class Attribute(univ.Sequence):
+ componentType = namedtype.NamedTypes(
+ namedtype.NamedType('type', AttributeType()),
+ namedtype.NamedType('values', AttributeValueSet()),
+ )
+
+class Attributes(univ.SetOf):
+ componentType = Attribute()
+
+class RelativeDistinguishedName(univ.SetOf):
+ componentType = AttributeTypeAndValue()
+
+class RDNSequence(univ.SequenceOf):
+ componentType = RelativeDistinguishedName()
+
+class Name(univ.Choice):
+ componentType = namedtype.NamedTypes(
+ namedtype.NamedType('', RDNSequence())
+ )
+
+ def get_components(self):
+ components = self.getComponentByPosition(0)
+ complist = []
+ for idx in range(len(components)):
+ attrandvalue = components[idx].getComponentByPosition(0)
+ oid = attrandvalue.getComponentByPosition(0)
+ # FIXME, should handle any string type
+ value = attrandvalue.getComponentByPosition(1).getComponentByType(char.PrintableString.tagSet)
+ if value is None:
+ value = attrandvalue.getComponentByPosition(1).getComponentByType(char.UTF8String.tagSet)
+ if value is None:
+ value = attrandvalue.getComponentByPosition(1).getComponentByType(char.IA5String.tagSet)
+ vout = value.prettyOut(value).decode('utf-8')
+ oidout = oid.prettyOut(oid).decode('utf-8')
+ c = ((oidtable.get(oidout, oidout), vout))
+ complist.append(c)
+
+ return tuple(complist)
+
+class AnotherName(univ.Sequence):
+ componentType = namedtype.NamedTypes(
+ namedtype.NamedType('type-id', univ.ObjectIdentifier()),
+ namedtype.NamedType('value', univ.Any())
+ )
+
+class rfc822Name(char.IA5String):
+ tagSet = char.IA5String.tagSet.tagImplicitly(
+ tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1)
+ )
+
+class dNSName(char.IA5String):
+ tagSet = char.IA5String.tagSet.tagImplicitly(
+ tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2)
+ )
+
+class x400Address(univ.OctetString):
+ tagSet = univ.OctetString.tagSet.tagImplicitly(
+ tag.Tag(tag.tagClassUniversal, tag.tagFormatSimple, 3)
+ )
+
+class directoryName(Name):
+ tagSet = Name.tagSet.tagImplicitly(
+ tag.Tag(tag.tagClassUniversal, tag.tagFormatSimple, 4)
+ )
+
+class uniformResourceIdentifier(char.IA5String):
+ tagSet = char.IA5String.tagSet.tagImplicitly(
+ tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 6)
+ )
+
+# Not all general types are handled, nor are these necessarily done
+# per the specification.
+class GeneralName(univ.Choice):
+ componentType = namedtype.NamedTypes(
+ namedtype.NamedType('otherName', AnotherName().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))),
+ namedtype.NamedType('rfc822Name', rfc822Name()), #1
+ namedtype.NamedType('dNSName', dNSName()), #2
+ namedtype.NamedType('x400Address', x400Address()), #3
+ namedtype.NamedType('directoryName', directoryName()), #4
+ # 5 FIXME
+ namedtype.NamedType('uniformResourceIdentifier', uniformResourceIdentifier()), #6
+# namedtype.NamedType('uniformResourceIdentifier', char.IA5String(tagSet=char.IA5String.tagSet.tagImplicitly(tag.Tag(tag.tagClassUniversal, tag.tagFormatSimple, 6)))),
+ )
+
+class GeneralNames(univ.SequenceOf):
+ componentType = GeneralName()
+ sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX)
+
+class SubjectAltName(univ.SequenceOf):
+ componentType = GeneralName()
+ sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX)
+
+class DistributionPointName(univ.Choice):
+ componentType = namedtype.NamedTypes(
+ namedtype.NamedType('fullName', GeneralNames().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))),
+ namedtype.NamedType('nameRelativeToCRLIssuer', RelativeDistinguishedName().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))),
+ )
+
+class DistributionPoint(univ.Sequence):
+ componentType = namedtype.NamedTypes(
+ namedtype.OptionalNamedType('distributionPoint', DistributionPointName().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))),
+ namedtype.OptionalNamedType('reasons', univ.BitString().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))), # FIXME
+ namedtype.OptionalNamedType('cRLIssuer', GeneralNames().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))),
+ )
+
+class cRLDistributionPoints(univ.SequenceOf):
+ componentType = DistributionPoint()
+ sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX)
+
+class basicConstraints(univ.Sequence):
+ componentType = namedtype.NamedTypes(
+ namedtype.DefaultedNamedType('cA', univ.Boolean('False')),
+ namedtype.OptionalNamedType('pathLenConstraint', univ.Integer()),
+ )
+
+class AlgorithmIdentifier(univ.Sequence):
+ componentType = namedtype.NamedTypes(
+ namedtype.NamedType('algorithm', univ.ObjectIdentifier()),
+ namedtype.OptionalNamedType('parameters', univ.Any())
+ )
+
+class SubjectPublicKeyInfo(univ.Sequence):
+ componentType = namedtype.NamedTypes(
+ namedtype.NamedType('algorithm', AlgorithmIdentifier()),
+ namedtype.NamedType('subjectPublicKey', univ.BitString())
+ )
+
+class Version(univ.Integer): pass
+
+class CertificationRequestInfo(univ.Sequence):
+ componentType = namedtype.NamedTypes(
+ namedtype.NamedType('version', Version()),
+ namedtype.NamedType('subject', Name()),
+ namedtype.NamedType('subjectPublicKeyInfo', SubjectPublicKeyInfo()),
+ namedtype.OptionalNamedType('attributes', Attributes().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0)))
+ )
+
+class CertificationRequest(univ.Sequence):
+ componentType = namedtype.NamedTypes(
+ namedtype.NamedType('certificationRequestInfo', CertificationRequestInfo()),
+ namedtype.NamedType('signatureAlgorithm', AlgorithmIdentifier()),
+ namedtype.NamedType('signatureValue', univ.BitString())
+ )
+
+ def get_version(self):
+ info = self.getComponentByName('certificationRequestInfo')
+ version = info.getComponentByName('version')
+ return version._value
+
+ def get_subject(self):
+ info = self.getComponentByName('certificationRequestInfo')
+ return info.getComponentByName('subject')
+
+ def get_subjectaltname(self):
+ attrs = self.get_attributes()
+ attrdict = dict(attrs)
+ if EXTENSIONREQUEST in attrdict:
+ # Extensions are a 3 position tuple
+ for ext in attrdict[EXTENSIONREQUEST]:
+ if ext[0] == '2.5.29.17':
+ # alt name is in the dNSName position
+ return ext[2][2]
+
+ def get_attributes(self):
+ info = self.getComponentByName('certificationRequestInfo')
+ attrs = info.getComponentByName('attributes')
+ attributes = []
+
+ for idx in range(len(attrs)):
+ atype = attrs[idx].getComponentByPosition(0)
+ aval = attrs[idx].getComponentByPosition(1)
+
+ # The attribute list is of type Any, need to re-encode
+ aenc = encoder.encode(aval, maxChunkSize=1024)
+ decoded = decoder.decode(aenc)[0]
+ oid = atype.prettyOut(atype)
+
+ if oid == "1.2.840.113549.1.9.20": # PKCS#9 Friendly Name
+ value = decoded.getComponentByPosition(0)
+ t = (oid, value.prettyOut(value).decode('utf-8'))
+ attributes.append(t)
+ elif oid == "1.2.840.113549.1.9.14": # PKCS#9 Extension Req
+ extensions = []
+ extlist = decoded.getComponentByPosition(0)
+ for jdx in range(len(extlist)):
+ ext = extlist.getComponentByPosition(jdx)
+ # An extension has 3 elements:
+ # oid
+ # bool - critical
+ # value
+ if len(ext) == 2: # If no critical, default to False
+ extoid = atype.prettyOut(ext.getComponentByPosition(0))
+ critical = False
+ extvalue = ext.getComponentByPosition(1)
+ else:
+ extoid = atype.prettyOut(ext.getComponentByPosition(0))
+ critical = bool(ext.getComponentByPosition(1)._value)
+ extvalue = ext.getComponentByPosition(2)
+
+ if extoid == '2.5.29.19': # basicConstraints
+ extdecoded = decoder.decode(extvalue._value, asn1Spec=basicConstraints())[0]
+ ca = bool(extdecoded[0])
+ if len(extdecoded) == 2: # path length is optional
+ pathlen = extdecoded[1]._value
+ else:
+ pathlen = None
+ constraint = (ca, pathlen)
+ e = (extoid, critical, constraint)
+ extensions.append(e)
+ continue
+ elif extoid == '2.5.29.31': # cRLDistributionPoints
+ extdecoded = decoder.decode(extvalue._value, asn1Spec=cRLDistributionPoints())[0]
+ distpoints = []
+ for elem in range(len(extdecoded)):
+ name = extdecoded[elem]
+ # DistributionPoint is position 0
+ distpoint = name.getComponentByPosition(0)
+ # fullName is position 0
+ fullname = distpoint.getComponentByPosition(0)
+ for crl in range(len(fullname)):
+ # Get the GeneralName, URI type
+ uri = fullname.getComponentByPosition(crl).getComponentByPosition(5)
+ distpoints.append(uri.prettyOut(uri).decode('utf-8'))
+ e = (extoid, critical, tuple(distpoints))
+ extensions.append(e)
+ continue
+
+ # The data is is encoded as "Any". Pull the raw data out
+ # and re-decode it using a different specification.
+ try:
+ extdecoded = decoder.decode(extvalue._value, asn1Spec=GeneralNames())[0]
+ except error.PyAsn1Error:
+ # I've seen CSRs where this isn't a sequence of names
+ # but is a single name, try to handle that too.
+ try:
+ extdecoded = decoder.decode(extvalue._value, asn1Spec=GeneralName())[0]
+ extdecoded = [extdecoded]
+ except error.PyAsn1Error, e:
+ # skip for now
+ generalnames = 9*["Error"]
+ e = (extoid, critical, tuple(generalnames))
+ extensions.append(e)
+ continue
+
+ # We now have a list of extensions in the order they
+ # are in the request as GeneralNames. We iterate through
+ # each of those to get a GeneralName. We then have to
+ # iterate through that to find the position set in it.
+
+ # Note that not every type will be returned. Those that
+ # are handled are returned in a tuple in the position
+ # which they are in the request.
+ generalnames = 9*[None]
+ for elem in range(len(extdecoded)):
+ name = extdecoded[elem]
+ for n in range(len(name)):
+ if name[n] is None:
+ continue
+ if generalnames[n] is None:
+ generalnames[n] = []
+ if n == 3: # OctetString
+ generalnames[n].append(name[n]._value)
+ if n in [1, 2, 6]: # IA5String
+ if n == 6 and extoid == "2.5.29.37":
+ # Extended key usage
+ v = copy.deepcopy(extvalue._value)
+ othername = decoder.decode(v, asn1Spec=ExtKeyUsageSyntax())[0]
+ keyusage = []
+ for l in range(len(othername)):
+ keyusage.append(othername[l].prettyOut(othername[l]))
+
+ generalnames[n] = tuple(keyusage)
+ else:
+ generalnames[n].append(name[n].prettyOut(name[n]).decode('utf-8'))
+ if n == 0: # AnotherName
+ nameoid = name[n].getComponentByPosition(0)
+ nameoid = nameoid.prettyOut(nameoid)
+ val = name[n].getComponentByPosition(1)
+ if nameoid == "1.3.6.1.4.1.311.20.2.3": # UPN
+ v = copy.deepcopy(val._value)
+ othername = decoder.decode(v, asn1Spec=UPN())[0]
+ generalnames[0].append(othername.prettyOut(othername).decode('utf-8'))
+
+ e = (extoid, critical, tuple(generalnames))
+ extensions.append(e)
+ t = (oid, tuple(extensions))
+ attributes.append(t)
+
+ return tuple(attributes)
+
+def strip_header(csr):
+ """
+ Remove the header and footer from a CSR.
+ """
+ s = csr.find("-----BEGIN NEW CERTIFICATE REQUEST-----")
+ if s == -1:
+ s = csr.find("-----BEGIN CERTIFICATE REQUEST-----")
+ if s >= 0:
+ e = csr.find("-----END")
+ csr = csr[s+40:e]
+
+ return csr
+
+def load_certificate_request(csr):
+ """
+ Given a base64-encoded certificate request, with or without the
+ header/footer, return a request object.
+ """
+ csr = strip_header(csr)
+
+ substrate = base64.b64decode(csr)
+
+ return decoder.decode(substrate, asn1Spec=CertificationRequest())[0]
+
+if __name__ == '__main__':
+ # Read PEM certs from stdin and print them out in plain text
+
+ stSpam, stHam, stDump = 0, 1, 2
+ state = stSpam
+
+ for certLine in sys.stdin.readlines():
+ certLine = string.strip(certLine)
+ if state == stSpam:
+ if state == stSpam:
+ if certLine == '-----BEGIN NEW CERTIFICATE REQUEST-----':
+ certLines = []
+ state = stHam
+ continue
+ if state == stHam:
+ if certLine == '-----END NEW CERTIFICATE REQUEST-----':
+ state = stDump
+ else:
+ certLines.append(certLine)
+ complist = []
+ if state == stDump:
+ substrate = ''
+ for certLine in certLines:
+ substrate = substrate + base64.b64decode(certLine)
+
+ request = decoder.decode(substrate, asn1Spec=CertificationRequest())[0]
+ subject = request.get_subject()
+ attrs = request.get_attributes()
+ print "Attributes:"
+ print attrs
+
+ print "Subject:"
+ complist = subject.get_components()
+ print complist
+ out=""
+ for c in complist:
+ out = out + "%s=%s," % (c[0], c[1])
+ print out[:-1]
+
+ print request.get_subjectaltname()
+
+ # Re-encode the request just to be sure things are working
+ assert encoder.encode(request, maxChunkSize=1024) == substrate, 'cert recode fails'
+
+ state = stSpam
diff --git a/ipalib/plugins/cert.py b/ipalib/plugins/cert.py
index 48ceaa6d7..ba088dd96 100644
--- a/ipalib/plugins/cert.py
+++ b/ipalib/plugins/cert.py
@@ -28,12 +28,16 @@ if api.env.enable_ra is not True:
raise SkipPluginModule(reason='env.enable_ra is not True')
from ipalib import Command, Str, Int, Bytes, Flag, File
from ipalib import errors
+from ipalib import pkcs10
+from ipalib import x509
from ipalib.plugins.virtual import *
from ipalib.plugins.service import split_principal
import base64
-from OpenSSL import crypto
from ipalib.request import context
from ipapython import dnsclient
+from pyasn1.error import PyAsn1Error
+import logging
+import traceback
def get_serial(certificate):
"""
@@ -45,9 +49,8 @@ def get_serial(certificate):
if type(certificate) in (list, tuple):
certificate = certificate[0]
try:
- x509 = crypto.load_certificate(crypto.FILETYPE_ASN1, certificate)
- serial = str(x509.get_serial_number())
- except crypto.Error:
+ serial = str(x509.get_serial_number(certificate))
+ except PyAsn1Error:
raise errors.GenericError(format='Unable to decode certificate in entry')
return serial
@@ -57,25 +60,49 @@ def get_csr_hostname(csr):
Return the value of CN in the subject of the request
"""
try:
- der = base64.b64decode(csr)
- request = crypto.load_certificate_request(crypto.FILETYPE_ASN1, der)
+ request = pkcs10.load_certificate_request(csr)
sub = request.get_subject().get_components()
for s in sub:
if s[0].lower() == "cn":
return s[1]
- except crypto.Error, e:
- raise errors.GenericError(format='Unable to decode CSR: %s' % str(e))
+ except PyAsn1Error:
+ # The ASN.1 decoding errors tend to be long and involved and the
+ # last bit is generally not interesting. We need the whole traceback.
+ logging.error('Unable to decode CSR\n%s', traceback.format_exc())
+ raise errors.GenericError(format='Failure decoding Certificate Signing Request')
return None
+def get_subjectaltname(csr):
+ """
+ Return the value of the subject alt name, if any
+ """
+ try:
+ request = pkcs10.load_certificate_request(csr)
+ except PyAsn1Error:
+ # The ASN.1 decoding errors tend to be long and involved and the
+ # last bit is generally not interesting. We need the whole traceback.
+ logging.error('Unable to decode CSR\n%s', traceback.format_exc())
+ raise errors.GenericError(format='Failure decoding Certificate Signing Request')
+ return request.get_subjectaltname()
+
def validate_csr(ugettext, csr):
"""
- For now just verify that it is properly base64-encoded.
+ Ensure the CSR is base64-encoded and can be decoded by our PKCS#10
+ parser.
"""
try:
- base64.b64decode(csr)
- except Exception, e:
+ request = pkcs10.load_certificate_request(csr)
+
+ # Explicitly request the attributes. This fires off additional
+ # decoding to get things like the subjectAltName.
+ attrs = request.get_attributes()
+ except TypeError, e:
raise errors.Base64DecodeError(reason=str(e))
+ except PyAsn1Error:
+ raise errors.GenericError(format='Failure decoding Certificate Signing Request')
+ except Exception, e:
+ raise errors.GenericError(format='Failure decoding Certificate Signing Request: %s' % str(e))
class cert_request(VirtualCommand):
@@ -107,38 +134,43 @@ class cert_request(VirtualCommand):
def execute(self, csr, **kw):
ldap = self.api.Backend.ldap2
- skw = {"all": True}
principal = kw.get('principal')
add = kw.get('add')
del kw['principal']
del kw['add']
service = None
- # We just want the CSR bits, make sure there is nothing else
- s = csr.find("-----BEGIN NEW CERTIFICATE REQUEST-----")
- e = csr.find("-----END NEW CERTIFICATE REQUEST-----")
- if s >= 0:
- csr = csr[s+40:e]
+ """
+ Access control is partially handled by the ACI titled
+ 'Hosts can modify service userCertificate'. This is for the case
+ where a machine binds using a host/ prinicpal. It can only do the
+ request if the target hostname is in the managedBy attribute which
+ is managed using the add/del member commands.
+
+ Binding with a user principal one needs to be in the request_certs
+ taskgroup (directly or indirectly via role membership).
+ """
# Can this user request certs?
self.check_access()
# FIXME: add support for subject alt name
- # Is this cert for this principal?
- subject_host = get_csr_hostname(csr)
# Ensure that the hostname in the CSR matches the principal
+ subject_host = get_csr_hostname(csr)
(servicename, hostname, realm) = split_principal(principal)
if subject_host.lower() != hostname.lower():
raise errors.ACIError(info="hostname in subject of request '%s' does not match principal hostname '%s'" % (subject_host, hostname))
+ dn = None
+ service = None
# See if the service exists and punt if it doesn't and we aren't
# going to add it
try:
- (dn, service) = api.Command['service_show'](principal, **skw)
+ (dn, service) = api.Command['service_show'](principal, all=True, raw=True)
if 'usercertificate' in service:
# FIXME, what to do here? Do we revoke the old cert?
- raise errors.GenericError(format='entry already has a certificate, serial number %s' % get_serial(service['usercertificate']))
+ raise errors.GenericError(format='entry already has a certificate, serial number %s' % get_serial(base64.b64encode(service['usercertificate'][0])))
except errors.NotFound, e:
if not add:
raise errors.NotFound(reason="The service principal for this request doesn't exist.")
@@ -151,6 +183,22 @@ class cert_request(VirtualCommand):
if not ldap.can_write(dn, "usercertificate"):
raise errors.ACIError(info="Insufficient 'write' privilege to the 'userCertificate' attribute of entry '%s'." % dn)
+ # Validate the subject alt name, if any
+ subjectaltname = get_subjectaltname(csr)
+ if subjectaltname is not None:
+ for name in subjectaltname:
+ try:
+ (hostdn, hostentry) = api.Command['host_show'](name, all=True, raw=True)
+ except errors.NotFound:
+ # We don't want to issue any certificates referencing
+ # machines we don't know about. Nothing is stored in this
+ # host record related to this certificate.
+ raise errors.NotFound(reason='no host record for subject alt name %s in certificate request' % name)
+ authprincipal = getattr(context, 'principal')
+ if authprincipal.startswith("host/"):
+ if not hostdn in service.get('managedby', []):
+ raise errors.ACIError(info="Insufficient privilege to create a certificate with subject alt name '%s'." % name)
+
# Request the certificate
result = self.Backend.ra.request_certificate(csr, **kw)
diff --git a/ipalib/plugins/service.py b/ipalib/plugins/service.py
index 449acbaec..c88695e42 100644
--- a/ipalib/plugins/service.py
+++ b/ipalib/plugins/service.py
@@ -23,11 +23,10 @@ Services (Identity)
"""
import base64
-from OpenSSL import crypto
-
from ipalib import api, errors
from ipalib import Str, Flag, Bytes
from ipalib.plugins.baseldap import *
+from ipalib import x509
def get_serial(certificate):
@@ -35,8 +34,7 @@ def get_serial(certificate):
Given a certificate, return the serial number in that cert.
"""
try:
- x509 = crypto.load_certificate(crypto.FILETYPE_ASN1, certificate)
- serial = str(x509.get_serial_number())
+ serial = str(x509.get_serial_number(certificate))
except crypto.Error:
raise errors.GenericError(
format='Unable to decode certificate in entry'
@@ -247,7 +245,7 @@ api.register(service_show)
class service_add_host(LDAPAddMember):
"""
- Add members to service.
+ Add hosts that can manage this service.
"""
member_attributes = ['managedby']
@@ -256,7 +254,7 @@ api.register(service_add_host)
class service_remove_host(LDAPRemoveMember):
"""
- Remove members from service.
+ Remove hosts that can manage this service.
"""
member_attributes = ['managedby']
diff --git a/ipalib/x509.py b/ipalib/x509.py
new file mode 100644
index 000000000..ee9ceb3e0
--- /dev/null
+++ b/ipalib/x509.py
@@ -0,0 +1,272 @@
+"""
+Imported from pyasn1 project:
+
+Copyright (c) 2005-2009 Ilya Etingof <ilya@glas.net>, all rights reserved.
+
+THIS SOFTWARE IS NOT FAULT TOLERANT AND SHOULD NOT BE USED IN ANY SITUATION
+ENDANGERING HUMAN LIFE OR PROPERTY.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+
+ * Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+ * The name of the authors may not be used to endorse or promote products
+ derived from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS''
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR
+ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
+THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+"""
+
+"""
+Enhancements released under IPA GPLv2 only license
+"""
+
+# Read ASN.1/PEM X.509 certificates on stdin, parse each into plain text,
+# then build substrate from it
+import sys, string, base64
+from pyasn1.type import tag,namedtype,namedval,univ,constraint,char,useful
+from pyasn1.codec.der import decoder, encoder
+from pyasn1 import error
+
+# Would be autogenerated from ASN.1 source by a ASN.1 parser
+# X.509 spec (rfc2459)
+
+# Common OIDs found in a subject
+oidtable = { "2.5.4.3": "CN",
+ "2.5.4.6": "C",
+ "2.5.4.7": "L",
+ "2.5.4.8": "ST",
+ "2.5.4.10": "O",
+ "2.5.4.11": "OU",
+ "1.2.840.113549.1.9.1": "E",
+ "0.9.2342.19200300.100.1.25": "DC",
+ }
+
+MAX = 64 # XXX ?
+
+class DirectoryString(univ.Choice):
+ componentType = namedtype.NamedTypes(
+ namedtype.NamedType('teletexString', char.TeletexString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
+ namedtype.NamedType('printableString', char.PrintableString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
+ namedtype.NamedType('universalString', char.UniversalString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
+ namedtype.NamedType('utf8String', char.UTF8String().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
+ namedtype.NamedType('bmpString', char.BMPString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
+ namedtype.NamedType('ia5String', char.IA5String().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))) # hm, this should not be here!? XXX
+ )
+
+class AttributeValue(DirectoryString): pass
+
+class AttributeType(univ.ObjectIdentifier): pass
+
+class AttributeTypeAndValue(univ.Sequence):
+ componentType = namedtype.NamedTypes(
+ namedtype.NamedType('type', AttributeType()),
+ namedtype.NamedType('value', AttributeValue())
+ )
+
+class RelativeDistinguishedName(univ.SetOf):
+ componentType = AttributeTypeAndValue()
+
+class RDNSequence(univ.SequenceOf):
+ componentType = RelativeDistinguishedName()
+
+class Name(univ.Choice):
+ componentType = namedtype.NamedTypes(
+ namedtype.NamedType('', RDNSequence())
+ )
+
+ def get_components(self):
+ components = self.getComponentByPosition(0)
+ complist = []
+ for idx in range(len(components)):
+ attrandvalue = components[idx].getComponentByPosition(0)
+ oid = attrandvalue.getComponentByPosition(0)
+ # FIXME, should handle any string type
+ value = attrandvalue.getComponentByPosition(1).getComponentByType(char.PrintableString.tagSet)
+ if value is None:
+ value = attrandvalue.getComponentByPosition(1).getComponentByType(char.UTF8String.tagSet)
+ if value is None:
+ value = attrandvalue.getComponentByPosition(1).getComponentByType(char.IA5String.tagSet)
+ vout = value.prettyOut(value).decode('utf-8')
+ oidout = oid.prettyOut(oid).decode('utf-8')
+ c = ((oidtable.get(oidout, oidout), vout))
+ complist.append(c)
+
+ return tuple(complist)
+
+class AlgorithmIdentifier(univ.Sequence):
+ componentType = namedtype.NamedTypes(
+ namedtype.NamedType('algorithm', univ.ObjectIdentifier()),
+ namedtype.OptionalNamedType('parameters', univ.Null())
+ # XXX syntax screwed?
+# namedtype.OptionalNamedType('parameters', univ.ObjectIdentifier())
+ )
+
+class Extension(univ.Sequence):
+ componentType = namedtype.NamedTypes(
+ namedtype.NamedType('extnID', univ.ObjectIdentifier()),
+ namedtype.DefaultedNamedType('critical', univ.Boolean('False')),
+ namedtype.NamedType('extnValue', univ.OctetString())
+ )
+
+class Extensions(univ.SequenceOf):
+ componentType = Extension()
+ sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX)
+
+class SubjectPublicKeyInfo(univ.Sequence):
+ componentType = namedtype.NamedTypes(
+ namedtype.NamedType('algorithm', AlgorithmIdentifier()),
+ namedtype.NamedType('subjectPublicKey', univ.BitString())
+ )
+
+class UniqueIdentifier(univ.BitString): pass
+
+class Time(univ.Choice):
+ componentType = namedtype.NamedTypes(
+ namedtype.NamedType('utcTime', useful.UTCTime()),
+ namedtype.NamedType('generalTime', useful.GeneralizedTime())
+ )
+
+class Validity(univ.Sequence):
+ componentType = namedtype.NamedTypes(
+ namedtype.NamedType('notBefore', Time()),
+ namedtype.NamedType('notAfter', Time())
+ )
+
+class CertificateSerialNumber(univ.Integer): pass
+
+class Version(univ.Integer):
+ namedValues = namedval.NamedValues(
+ ('v1', 0), ('v2', 1), ('v3', 2)
+ )
+
+class TBSCertificate(univ.Sequence):
+ componentType = namedtype.NamedTypes(
+ namedtype.DefaultedNamedType('version', Version('v1', tagSet=Version.tagSet.tagExplicitly(tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0)))),
+ namedtype.NamedType('serialNumber', CertificateSerialNumber()),
+ namedtype.NamedType('signature', AlgorithmIdentifier()),
+ namedtype.NamedType('issuer', Name()),
+ namedtype.NamedType('validity', Validity()),
+ namedtype.NamedType('subject', Name()),
+ namedtype.NamedType('subjectPublicKeyInfo', SubjectPublicKeyInfo()),
+ namedtype.OptionalNamedType('issuerUniqueID', UniqueIdentifier().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))),
+ namedtype.OptionalNamedType('subjectUniqueID', UniqueIdentifier().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))),
+ namedtype.OptionalNamedType('extensions', Extensions().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 3)))
+ )
+
+class Certificate(univ.Sequence):
+ componentType = namedtype.NamedTypes(
+ namedtype.NamedType('tbsCertificate', TBSCertificate()),
+ namedtype.NamedType('signatureAlgorithm', AlgorithmIdentifier()),
+ namedtype.NamedType('signatureValue', univ.BitString())
+ )
+
+ def get_version(self):
+ info = self.getComponentByName('tbsCertificate')
+ version = info.getComponentByName('version')
+ return version._value
+
+ def get_subject(self):
+ info = self.getComponentByName('tbsCertificate')
+ return info.getComponentByName('subject')
+
+ def get_serial_number(self):
+ info = self.getComponentByName('tbsCertificate')
+ return info.getComponentByName('serialNumber')
+
+# end of ASN.1 data structures
+
+def strip_header(pem):
+ """
+ Remove the header and footer from a certificate.
+ """
+ s = pem.find("-----BEGIN CERTIFICATE-----")
+ if s >= 0:
+ e = pem.find("-----END CERTIFICATE-----")
+ pem = pem[s+27:e]
+
+ return pem
+
+
+def load_certificate(pem):
+ """
+ Given a base64-encoded certificate, with or without the
+ header/footer, return a request object.
+ """
+ pem = strip_header(pem)
+
+ substrate = base64.b64decode(pem)
+
+ return decoder.decode(substrate, asn1Spec=Certificate())[0]
+
+def get_subject_components(certificate):
+ """
+ Load an X509.3 certificate and get the subject.
+
+ Return a tuple of a certificate subject.
+ (('CN', u'www.example.com', ('O', u'IPA'))
+ """
+
+ # Grab the subject, reverse it, combine it and return it
+ x509cert = load_certificate(certificate)
+ return x509cert.get_subject().get_components()
+
+def get_serial_number(certificate):
+ """
+ Return the serial number of a certificate.
+
+ Returns an integer
+ """
+ x509cert = load_certificate(certificate)
+ return x509cert.get_serial_number()
+
+if __name__ == '__main__':
+ certType = Certificate()
+
+ # Read PEM certs from stdin and print them out in plain text
+
+ stSpam, stHam, stDump = 0, 1, 2
+ state = stSpam
+ certCnt = 0
+
+ for certLine in sys.stdin.readlines():
+ certLine = string.strip(certLine)
+ if state == stSpam:
+ if state == stSpam:
+ if certLine == '-----BEGIN CERTIFICATE-----':
+ certLines = []
+ state = stHam
+ continue
+ if state == stHam:
+ if certLine == '-----END CERTIFICATE-----':
+ state = stDump
+ else:
+ certLines.append(certLine)
+ if state == stDump:
+ substrate = ''
+ for certLine in certLines:
+ substrate = substrate + base64.b64decode(certLine)
+
+ cert = decoder.decode(substrate, asn1Spec=certType)[0]
+ print cert.prettyPrint()
+
+ assert encoder.encode(cert) == substrate, 'cert recode fails'
+
+ certCnt = certCnt + 1
+ state = stSpam
+
+ print '*** %s PEM cert(s) de/serialized' % certCnt
diff --git a/ipaserver/plugins/selfsign.py b/ipaserver/plugins/selfsign.py
index 0ba7a7c44..d4b2efcf7 100644
--- a/ipaserver/plugins/selfsign.py
+++ b/ipaserver/plugins/selfsign.py
@@ -36,12 +36,13 @@ if api.env.ra_plugin != 'selfsign':
raise SkipPluginModule(reason='selfsign is not selected as RA plugin, it is %s' % api.env.ra_plugin)
from ipalib import Backend
from ipalib import errors
+from ipalib import x509
import subprocess
import os
from ipaserver.plugins import rabase
from ipaserver.install import certs
import tempfile
-from OpenSSL import crypto
+from pyasn1 import error
class ra(rabase.rabase):
"""
@@ -56,6 +57,15 @@ class ra(rabase.rabase):
:param request_type: The request type (defaults to ``'pkcs10'``).
"""
(csr_fd, csr_name) = tempfile.mkstemp()
+
+ # certutil wants the CSR to have have a header and footer. Add one
+ # if it isn't there.
+ s = csr.find('-----BEGIN NEW CERTIFICATE REQUEST-----')
+ if s == -1:
+ s = csr.find('-----BEGIN CERTIFICATE REQUEST-----')
+ if s == -1:
+ csr = '-----BEGIN NEW CERTIFICATE REQUEST-----\n' + csr + \
+ '-----END NEW CERTIFICATE REQUEST-----\n'
os.write(csr_fd, csr)
os.close(csr_fd)
(cert_fd, cert_name) = tempfile.mkstemp()
@@ -101,16 +111,15 @@ class ra(rabase.rabase):
try:
# Grab the subject, reverse it, combine it and return it
- x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
- sub = x509.get_subject().get_components()
+ sub = list(x509.get_subject_components(cert))
sub.reverse()
subject = ""
for s in sub:
subject = subject + "%s=%s," % (s[0], s[1])
subject = subject[:-1]
- serial = x509.get_serial_number()
- except crypto.Error, e:
+ serial = x509.get_serial_number(cert)
+ except error.PyAsn1Error, e:
raise errors.GenericError(format='Unable to decode certificate in entry: %s' % str(e))
# To make it look like dogtag return just the base64 data.
diff --git a/tests/test_pkcs10/__init__.py b/tests/test_pkcs10/__init__.py
new file mode 100644
index 000000000..cd1209dc8
--- /dev/null
+++ b/tests/test_pkcs10/__init__.py
@@ -0,0 +1,22 @@
+# Authors:
+# Rob Crittenden <rcritten@redhat.com>
+#
+# Copyright (C) 2009 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+"""
+Sub-package containing unit tests for `pkcs10` package.
+"""
diff --git a/tests/test_pkcs10/test0.csr b/tests/test_pkcs10/test0.csr
new file mode 100644
index 000000000..eadfb70b4
--- /dev/null
+++ b/tests/test_pkcs10/test0.csr
@@ -0,0 +1,12 @@
+-----BEGIN NEW CERTIFICATE REQUEST-----
+MIIBjjCB+AIBADBPMQswCQYDVQQGEwJVUzETMBEGA1UECBMKQ2FsaWZvcm5pYTEQ
+MA4GA1UEChMHRXhhbXBsZTEZMBcGA1UEAxMQdGVzdC5leGFtcGxlLmNvbTCBnzAN
+BgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAyxsN5dmvyKiw+5nyrcO3a61sivZRg+ja
+kyNIyUo+tIUiYwTdpPESAHTWRlk0XhydauAkWfOIN7pR3a5Z+kQw8W7F+DuZze2M
+6wRNmN+NTrTlqnKOiMHBXhIM0Qxrx68GDctYqtnKTVT94FvvLl9XYVdUEi2ePTc2
+Nyfr1z66+W0CAwEAAaAAMA0GCSqGSIb3DQEBBQUAA4GBAIf3r+Y6WHrFnttUqDow
+9/UCHtCeQlQoJqjjxi5wcjbkGwTgHbx/BPOd/8OVaHElboMXLGaZx+L/eFO6E9Yg
+mDOYv3OsibDFGaEhJrU8EnfuFZKnbrGeSC9Hkqrq+3OjqacaPla5N7MHKbfLY377
+ddbOHKzR0sURZ+ro4z3fATW2
+-----END NEW CERTIFICATE REQUEST-----
+
diff --git a/tests/test_pkcs10/test1.csr b/tests/test_pkcs10/test1.csr
new file mode 100644
index 000000000..0dad3ae1e
--- /dev/null
+++ b/tests/test_pkcs10/test1.csr
@@ -0,0 +1,13 @@
+-----BEGIN NEW CERTIFICATE REQUEST-----
+MIIBwDCCASkCAQAwTzELMAkGA1UEBhMCVVMxEzARBgNVBAgTCkNhbGlmb3JuaWEx
+EDAOBgNVBAoTB0V4YW1wbGUxGTAXBgNVBAMTEHRlc3QuZXhhbXBsZS5jb20wgZ8w
+DQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAMK+3uy1CGwek8jutw4UO62YTpkmStlw
+cKPEjTER7Ra1a1wyWJTo1mMnPhVia0GODeq8ERPgcIckCVogBu8+gL6g8NevaBNv
+ij1XWU08BEQqmoqAkrFiI8EdDckKYrSoXo2cg1fiTGzlG8AWtr5eT0op5jBBo0J6
+qXX5Sf6e+n+nAgMBAAGgMTAvBgkqhkiG9w0BCQ4xIjAgMB4GA1UdEQQXMBWCE3Rl
+c3Rsb3cuZXhhbXBsZS5jb20wDQYJKoZIhvcNAQEFBQADgYEAwRDa7ZOaym9mAUH7
+hudbvsRkqXHehgf51uMUq0OC9hQ6vPLWqUMAod05lxn3Tnvq6a/fVK0ybgCH5Ld7
+qpAcUruYdj7YxkFfuBc1dpAK6h94rVsJXFCWIMEZm9Fe7n5RERjhO6h2IRSXBHFz
+QIszvqBamm/W1ONKdQSM2g+M4BQ=
+-----END NEW CERTIFICATE REQUEST-----
+
diff --git a/tests/test_pkcs10/test2.csr b/tests/test_pkcs10/test2.csr
new file mode 100644
index 000000000..ccc47f890
--- /dev/null
+++ b/tests/test_pkcs10/test2.csr
@@ -0,0 +1,15 @@
+-----BEGIN NEW CERTIFICATE REQUEST-----
+MIICETCCAXoCAQAwTzELMAkGA1UEBhMCVVMxEzARBgNVBAgTCkNhbGlmb3JuaWEx
+EDAOBgNVBAoTB0V4YW1wbGUxGTAXBgNVBAMTEHRlc3QuZXhhbXBsZS5jb20wgZ8w
+DQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAOXfP8LeiU7g6wLCclgkT1lVskK+Lxm1
+6ijE4LmEQBk5nn2P46im+E/UOgTddbDo5cdJlkoCnqXkO4RkqJckXYDxfI34KL3C
+CRFPvOa5Sg02m1x5Rg3boZfS6NciP62lRp0SI+0TCt3F16wYZxMahVIOXjbJ6Lu5
+mGjNn7XaWJhFAgMBAAGggYEwfwYJKoZIhvcNAQkOMXIwcDAeBgNVHREEFzAVghN0
+ZXN0bG93LmV4YW1wbGUuY29tME4GA1UdHwRHMEUwQ6BBoD+GHGh0dHA6Ly9jYS5l
+eGFtcGxlLmNvbS9teS5jcmyGH2h0dHA6Ly9vdGhlci5leGFtcGxlLmNvbS9teS5j
+cmwwDQYJKoZIhvcNAQEFBQADgYEAkv8pppcgGhX7erJmvg9r2UHrRriuKaOYgKZQ
+lf/eBt2N0L2mV4QvCY82H7HWuE+7T3mra9ikfvz0nYkPJQe2gntjZzECE0Jt5LWR
+UZOFwX8N6wrX11U2xu0NlvsbjU6siWd6OZjZ1p5/V330lzut/q3CNzaAcW1Fx3wL
+sV5SXSw=
+-----END NEW CERTIFICATE REQUEST-----
+
diff --git a/tests/test_pkcs10/test_pkcs10.py b/tests/test_pkcs10/test_pkcs10.py
new file mode 100644
index 000000000..66d205b96
--- /dev/null
+++ b/tests/test_pkcs10/test_pkcs10.py
@@ -0,0 +1,119 @@
+# Authors:
+# Rob Crittenden <rcritten@redhat.com>
+#
+# Copyright (C) 2009 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+"""
+Test the `pkcs10.py` module.
+"""
+
+import os
+import sys
+import nose
+from tests.util import raises, PluginTester
+from ipalib import pkcs10
+from ipapython import ipautil
+
+class test_update(object):
+ """
+ Test the PKCS#10 Parser.
+ """
+
+ def setUp(self):
+ if ipautil.file_exists("test0.csr"):
+ self.testdir="./"
+ elif ipautil.file_exists("tests/test_pkcs10/test0.csr"):
+ self.testdir= "./tests/test_pkcs10/"
+ else:
+ raise nose.SkipTest("Unable to find test update files")
+
+ def read_file(self, filename):
+ fp = open(self.testdir + filename, "r")
+ data = fp.read()
+ fp.close()
+ return data
+
+ def test_0(self):
+ """
+ Test simple CSR with no attributes
+ """
+ csr = self.read_file("test0.csr")
+ request = pkcs10.load_certificate_request(csr)
+
+ attributes = request.get_attributes()
+ subject = request.get_subject()
+ components = subject.get_components()
+ compdict = dict(components)
+
+ assert(attributes == ())
+ assert(compdict['CN'] == u'test.example.com')
+ assert(compdict['ST'] == u'California')
+ assert(compdict['C'] == u'US')
+
+ def test_1(self):
+ """
+ Test CSR with subject alt name
+ """
+ csr = self.read_file("test1.csr")
+ request = pkcs10.load_certificate_request(csr)
+
+ attributes = request.get_attributes()
+ subject = request.get_subject()
+ components = subject.get_components()
+ compdict = dict(components)
+ attrdict = dict(attributes)
+
+ assert(compdict['CN'] == u'test.example.com')
+ assert(compdict['ST'] == u'California')
+ assert(compdict['C'] == u'US')
+
+ extensions = attrdict['1.2.840.113549.1.9.14']
+
+ for ext in range(len(extensions)):
+ if extensions[ext][0] == '2.5.29.17':
+ names = extensions[ext][2]
+ # check the dNSName field
+ assert(names[2] == [u'testlow.example.com'])
+
+ def test_2(self):
+ """
+ Test CSR with subject alt name and a list of CRL distribution points
+ """
+ csr = self.read_file("test2.csr")
+ request = pkcs10.load_certificate_request(csr)
+
+ attributes = request.get_attributes()
+ subject = request.get_subject()
+ components = subject.get_components()
+ compdict = dict(components)
+ attrdict = dict(attributes)
+
+ assert(compdict['CN'] == u'test.example.com')
+ assert(compdict['ST'] == u'California')
+ assert(compdict['C'] == u'US')
+
+ extensions = attrdict['1.2.840.113549.1.9.14']
+
+ for ext in range(len(extensions)):
+ if extensions[ext][0] == '2.5.29.17':
+ names = extensions[ext][2]
+ # check the dNSName field
+ assert(names[2] == [u'testlow.example.com'])
+ if extensions[ext][0] == '2.5.29.31':
+ urls = extensions[ext][2]
+ assert(len(urls) == 2)
+ assert(urls[0] == u'http://ca.example.com/my.crl')
+ assert(urls[1] == u'http://other.example.com/my.crl')