summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ipa.spec.in5
-rw-r--r--ipalib/pkcs10.py426
-rw-r--r--ipalib/plugins/cert.py40
-rw-r--r--ipapython/nsslib.py4
-rw-r--r--ipaserver/install/dsinstance.py2
-rw-r--r--ipaserver/plugins/selfsign.py28
-rw-r--r--tests/test_ipalib/test_x509.py12
-rw-r--r--tests/test_pkcs10/test3.csr3
-rw-r--r--tests/test_pkcs10/test4.csr4
-rw-r--r--tests/test_pkcs10/test5.csr20
-rw-r--r--tests/test_pkcs10/test_pkcs10.py95
11 files changed, 158 insertions, 481 deletions
diff --git a/ipa.spec.in b/ipa.spec.in
index 8bca7337f..0ccf7018f 100644
--- a/ipa.spec.in
+++ b/ipa.spec.in
@@ -176,7 +176,7 @@ Requires: python-kerberos >= 1.1-3
Requires: authconfig
Requires: gnupg
Requires: pyOpenSSL
-Requires: python-nss >= 0.9
+Requires: python-nss >= 0.9-8
Requires: python-lxml
%description python
@@ -494,6 +494,9 @@ fi
%endif
%changelog
+* Mon Jul 19 2010 Rob Crittenden <rcritten@redhat.com> - 1.99-25
+- Bump up minimum version of python-nss to pick up nss_is_initialize() API
+
* Thu Jun 24 2010 Adam Young <ayoung@redhat.com> - 1.99-24
- Removed python-asset based webui
diff --git a/ipalib/pkcs10.py b/ipalib/pkcs10.py
index 9119d12e2..3011c1f7a 100644
--- a/ipalib/pkcs10.py
+++ b/ipalib/pkcs10.py
@@ -1,7 +1,7 @@
# Authors:
# Rob Crittenden <rcritten@redhat.com>
#
-# Copyright (C) 2009 Red Hat
+# Copyright (C) 2010 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
@@ -17,356 +17,34 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-# Read PKCS#10 certificate requests (see RFC 2986 and 5280)
+import os
+import sys
+import base64
+import nss.nss as nss
+from ipapython import ipautil
+from ipalib import api
-# NOTE: Not every extension is currently handled. Known to now work:
-# 2.5.29.37 - extKeyUsage
+PEM = 0
+DER = 1
-import sys, string, base64
-from pyasn1.type import base,tag,namedtype,namedval,univ,constraint,char,useful
-from pyasn1.codec.der import decoder, encoder
-from pyasn1 import error
-import copy
-
-# Common OIDs found in a subject
-oidtable = { "2.5.4.3": "CN",
- "2.5.4.6": "C",
- "2.5.4.7": "L",
- "2.5.4.8": "ST",
- "2.5.4.10": "O",
- "2.5.4.11": "OU",
- "1.2.840.113549.1.9.1": "E",
- "0.9.2342.19200300.100.1.25": "DC",
- }
-
-# Some useful OIDs
-FRIENDLYNAME = '1.2.840.113549.1.9.20'
-EXTENSIONREQUEST = '1.2.840.113549.1.9.14'
-
-MAX = 32 # from mozilla/security/nss/lib/util/secasn1t.h
-
-class DirectoryString(univ.Choice):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('teletexString', char.TeletexString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
- namedtype.NamedType('printableString', char.PrintableString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
- namedtype.NamedType('universalString', char.UniversalString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
- namedtype.NamedType('utf8String', char.UTF8String().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
- namedtype.NamedType('bmpString', char.BMPString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
- )
-
-class AttributeValue(DirectoryString): pass
-
-class AttributeType(univ.ObjectIdentifier): pass
-
-class AttributeTypeAndValue(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('type', AttributeType()),
- namedtype.NamedType('value', AttributeValue()) # FIXME, could be any type
- )
-
-class KeyPurposeId(univ.ObjectIdentifier): pass
-
-class ExtKeyUsageSyntax(univ.SequenceOf):
- componentType = KeyPurposeId()
-
-class UPN(char.UTF8String):
- tagSet = char.UTF8String.tagSet.tagExplicitly(
- tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)
- )
-
-class AttributeValueSet(univ.SetOf):
- componentType = univ.Any()
- sizeSpec = univ.SetOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX)
-
-class Attribute(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('type', AttributeType()),
- namedtype.NamedType('values', AttributeValueSet()),
- )
-
-class Attributes(univ.SetOf):
- componentType = Attribute()
-
-class RelativeDistinguishedName(univ.SetOf):
- componentType = AttributeTypeAndValue()
-
-class RDNSequence(univ.SequenceOf):
- componentType = RelativeDistinguishedName()
-
-class Name(univ.Choice):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('', RDNSequence())
- )
-
- def get_components(self):
- components = self.getComponentByPosition(0)
- complist = []
- for idx in range(len(components)):
- attrandvalue = components[idx].getComponentByPosition(0)
- oid = attrandvalue.getComponentByPosition(0)
- # FIXME, should handle any string type
- value = attrandvalue.getComponentByPosition(1).getComponentByType(char.PrintableString.tagSet)
- if value is None:
- value = attrandvalue.getComponentByPosition(1).getComponentByType(char.UTF8String.tagSet)
- if value is None:
- value = attrandvalue.getComponentByPosition(1).getComponentByType(char.IA5String.tagSet)
- vout = value.prettyOut(value).decode('utf-8')
- oidout = oid.prettyOut(oid).decode('utf-8')
- c = ((oidtable.get(oidout, oidout), vout))
- complist.append(c)
-
- return tuple(complist)
-
-class AnotherName(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('type-id', univ.ObjectIdentifier()),
- namedtype.NamedType('value', univ.Any())
- )
-
-class rfc822Name(char.IA5String):
- tagSet = char.IA5String.tagSet.tagImplicitly(
- tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1)
- )
-
-class dNSName(char.IA5String):
- tagSet = char.IA5String.tagSet.tagImplicitly(
- tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2)
- )
-
-class x400Address(univ.OctetString):
- tagSet = univ.OctetString.tagSet.tagImplicitly(
- tag.Tag(tag.tagClassUniversal, tag.tagFormatSimple, 3)
- )
-
-class directoryName(Name):
- tagSet = Name.tagSet.tagImplicitly(
- tag.Tag(tag.tagClassUniversal, tag.tagFormatSimple, 4)
- )
-
-class uniformResourceIdentifier(char.IA5String):
- tagSet = char.IA5String.tagSet.tagImplicitly(
- tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 6)
- )
-
-# Not all general types are handled, nor are these necessarily done
-# per the specification.
-class GeneralName(univ.Choice):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('otherName', AnotherName().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))),
- namedtype.NamedType('rfc822Name', rfc822Name()), #1
- namedtype.NamedType('dNSName', dNSName()), #2
- namedtype.NamedType('x400Address', x400Address()), #3
- namedtype.NamedType('directoryName', directoryName()), #4
- # 5 FIXME
- namedtype.NamedType('uniformResourceIdentifier', uniformResourceIdentifier()), #6
-# namedtype.NamedType('uniformResourceIdentifier', char.IA5String(tagSet=char.IA5String.tagSet.tagImplicitly(tag.Tag(tag.tagClassUniversal, tag.tagFormatSimple, 6)))),
- )
-
-class GeneralNames(univ.SequenceOf):
- componentType = GeneralName()
- sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX)
-
-class SubjectAltName(univ.SequenceOf):
- componentType = GeneralName()
- sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX)
-
-class DistributionPointName(univ.Choice):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('fullName', GeneralNames().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))),
- namedtype.NamedType('nameRelativeToCRLIssuer', RelativeDistinguishedName().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))),
- )
-
-class DistributionPoint(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.OptionalNamedType('distributionPoint', DistributionPointName().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))),
- namedtype.OptionalNamedType('reasons', univ.BitString().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))), # FIXME
- namedtype.OptionalNamedType('cRLIssuer', GeneralNames().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))),
- )
-
-class cRLDistributionPoints(univ.SequenceOf):
- componentType = DistributionPoint()
- sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX)
-
-class basicConstraints(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.DefaultedNamedType('cA', univ.Boolean('False')),
- namedtype.OptionalNamedType('pathLenConstraint', univ.Integer()),
- )
-
-class AlgorithmIdentifier(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('algorithm', univ.ObjectIdentifier()),
- namedtype.OptionalNamedType('parameters', univ.Any())
- )
-
-class SubjectPublicKeyInfo(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('algorithm', AlgorithmIdentifier()),
- namedtype.NamedType('subjectPublicKey', univ.BitString())
- )
-
-class Version(univ.Integer): pass
-
-class CertificationRequestInfo(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('version', Version()),
- namedtype.NamedType('subject', Name()),
- namedtype.NamedType('subjectPublicKeyInfo', SubjectPublicKeyInfo()),
- namedtype.OptionalNamedType('attributes', Attributes().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0)))
- )
-
-class CertificationRequest(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('certificationRequestInfo', CertificationRequestInfo()),
- namedtype.NamedType('signatureAlgorithm', AlgorithmIdentifier()),
- namedtype.NamedType('signatureValue', univ.BitString())
- )
-
- def get_version(self):
- info = self.getComponentByName('certificationRequestInfo')
- version = info.getComponentByName('version')
- return version._value
-
- def get_subject(self):
- info = self.getComponentByName('certificationRequestInfo')
- return info.getComponentByName('subject')
-
- def get_subjectaltname(self):
- attrs = self.get_attributes()
- attrdict = dict(attrs)
- if EXTENSIONREQUEST in attrdict:
- # Extensions are a 3 position tuple
- for ext in attrdict[EXTENSIONREQUEST]:
- if ext[0] == '2.5.29.17':
- # alt name is in the dNSName position
- return ext[2][2]
-
- def get_attributes(self):
- info = self.getComponentByName('certificationRequestInfo')
- attrs = info.getComponentByName('attributes')
- attributes = []
-
- for idx in range(len(attrs)):
- atype = attrs[idx].getComponentByPosition(0)
- aval = attrs[idx].getComponentByPosition(1)
-
- # The attribute list is of type Any, need to re-encode
- aenc = encoder.encode(aval, maxChunkSize=1024)
- decoded = decoder.decode(aenc)[0]
- oid = atype.prettyOut(atype)
-
- if oid == "1.2.840.113549.1.9.20": # PKCS#9 Friendly Name
- value = decoded.getComponentByPosition(0)
- t = (oid, value.prettyOut(value).decode('utf-8'))
- attributes.append(t)
- elif oid == "1.2.840.113549.1.9.14": # PKCS#9 Extension Req
- extensions = []
- extlist = decoded.getComponentByPosition(0)
- for jdx in range(len(extlist)):
- ext = extlist.getComponentByPosition(jdx)
- # An extension has 3 elements:
- # oid
- # bool - critical
- # value
- if len(ext) == 2: # If no critical, default to False
- extoid = atype.prettyOut(ext.getComponentByPosition(0))
- critical = False
- extvalue = ext.getComponentByPosition(1)
- else:
- extoid = atype.prettyOut(ext.getComponentByPosition(0))
- critical = bool(ext.getComponentByPosition(1)._value)
- extvalue = ext.getComponentByPosition(2)
-
- if extoid == '2.5.29.19': # basicConstraints
- extdecoded = decoder.decode(extvalue._value, asn1Spec=basicConstraints())[0]
- ca = bool(extdecoded[0])
- if len(extdecoded) == 2: # path length is optional
- pathlen = extdecoded[1]._value
- else:
- pathlen = None
- constraint = (ca, pathlen)
- e = (extoid, critical, constraint)
- extensions.append(e)
- continue
- elif extoid == '2.5.29.31': # cRLDistributionPoints
- extdecoded = decoder.decode(extvalue._value, asn1Spec=cRLDistributionPoints())[0]
- distpoints = []
- for elem in range(len(extdecoded)):
- name = extdecoded[elem]
- # DistributionPoint is position 0
- distpoint = name.getComponentByPosition(0)
- # fullName is position 0
- fullname = distpoint.getComponentByPosition(0)
- for crl in range(len(fullname)):
- # Get the GeneralName, URI type
- uri = fullname.getComponentByPosition(crl).getComponentByPosition(5)
- distpoints.append(uri.prettyOut(uri).decode('utf-8'))
- e = (extoid, critical, tuple(distpoints))
- extensions.append(e)
- continue
-
- # The data is is encoded as "Any". Pull the raw data out
- # and re-decode it using a different specification.
- try:
- extdecoded = decoder.decode(extvalue._value, asn1Spec=GeneralNames())[0]
- except error.PyAsn1Error:
- # I've seen CSRs where this isn't a sequence of names
- # but is a single name, try to handle that too.
- try:
- extdecoded = decoder.decode(extvalue._value, asn1Spec=GeneralName())[0]
- extdecoded = [extdecoded]
- except error.PyAsn1Error, e:
- # skip for now
- generalnames = 9*["Error"]
- e = (extoid, critical, tuple(generalnames))
- extensions.append(e)
- continue
-
- # We now have a list of extensions in the order they
- # are in the request as GeneralNames. We iterate through
- # each of those to get a GeneralName. We then have to
- # iterate through that to find the position set in it.
-
- # Note that not every type will be returned. Those that
- # are handled are returned in a tuple in the position
- # which they are in the request.
- generalnames = 9*[None]
- for elem in range(len(extdecoded)):
- name = extdecoded[elem]
- for n in range(len(name)):
- if name[n] is None:
- continue
- if generalnames[n] is None:
- generalnames[n] = []
- if n == 3: # OctetString
- generalnames[n].append(name[n]._value)
- if n in [1, 2, 6]: # IA5String
- if n == 6 and extoid == "2.5.29.37":
- # Extended key usage
- v = copy.deepcopy(extvalue._value)
- othername = decoder.decode(v, asn1Spec=ExtKeyUsageSyntax())[0]
- keyusage = []
- for l in range(len(othername)):
- keyusage.append(othername[l].prettyOut(othername[l]))
+def get_subjectaltname(request):
+ """
+ Given a CSR return the subjectaltname value, if any.
- generalnames[n] = tuple(keyusage)
- else:
- generalnames[n].append(name[n].prettyOut(name[n]).decode('utf-8'))
- if n == 0: # AnotherName
- nameoid = name[n].getComponentByPosition(0)
- nameoid = nameoid.prettyOut(nameoid)
- val = name[n].getComponentByPosition(1)
- if nameoid == "1.3.6.1.4.1.311.20.2.3": # UPN
- v = copy.deepcopy(val._value)
- othername = decoder.decode(v, asn1Spec=UPN())[0]
- generalnames[0].append(othername.prettyOut(othername).decode('utf-8'))
+ The return value is a tuple of strings or None
+ """
+ for extension in request.extensions:
+ if extension.oid_tag == nss.SEC_OID_X509_SUBJECT_ALT_NAME:
+ return nss.x509_alt_name(extension.value)
+ return None
- e = (extoid, critical, tuple(generalnames))
- extensions.append(e)
- t = (oid, tuple(extensions))
- attributes.append(t)
+def get_subject(request):
+ """
+ Given a CSR return the subject value.
- return tuple(attributes)
+ This returns an nss.DN object.
+ """
+ return request.subject
def strip_header(csr):
"""
@@ -392,50 +70,26 @@ def load_certificate_request(csr):
substrate = base64.b64decode(csr)
- return decoder.decode(substrate, asn1Spec=CertificationRequest())[0]
+ # A fail-safe so we can always read a CSR. python-nss/NSS will segfault
+ # otherwise
+ if not nss.nss_is_initialized():
+ nss.nss_init_nodb()
-if __name__ == '__main__':
- # Read PEM certs from stdin and print them out in plain text
-
- stSpam, stHam, stDump = 0, 1, 2
- state = stSpam
+ return nss.CertificateRequest(substrate)
- for certLine in sys.stdin.readlines():
- certLine = string.strip(certLine)
- if state == stSpam:
- if state == stSpam:
- if certLine == '-----BEGIN NEW CERTIFICATE REQUEST-----':
- certLines = []
- state = stHam
- continue
- if state == stHam:
- if certLine == '-----END NEW CERTIFICATE REQUEST-----':
- state = stDump
- else:
- certLines.append(certLine)
- complist = []
- if state == stDump:
- substrate = ''
- for certLine in certLines:
- substrate = substrate + base64.b64decode(certLine)
+if __name__ == '__main__':
+ nss.nss_init_nodb()
- request = decoder.decode(substrate, asn1Spec=CertificationRequest())[0]
- subject = request.get_subject()
- attrs = request.get_attributes()
- print "Attributes:"
- print attrs
+ # Read PEM request from stdin and print out its components
- print "Subject:"
- complist = subject.get_components()
- print complist
- out=""
- for c in complist:
- out = out + "%s=%s," % (c[0], c[1])
- print out[:-1]
+ csrlines = sys.stdin.readlines()
+ csrlines = fp.readlines()
+ fp.close()
+ csr = ''.join(csrlines)
- print request.get_subjectaltname()
+ csr = load_certificate_request(csr)
- # Re-encode the request just to be sure things are working
- assert encoder.encode(request, maxChunkSize=1024) == substrate, 'cert recode fails'
+ print csr
- state = stSpam
+ print get_subject(csr)
+ print get_subjectaltname(csr)
diff --git a/ipalib/plugins/cert.py b/ipalib/plugins/cert.py
index 1de4ac64e..ed1d65ad2 100644
--- a/ipalib/plugins/cert.py
+++ b/ipalib/plugins/cert.py
@@ -69,7 +69,6 @@ from ipalib import x509
from ipalib.plugins.virtual import *
from ipalib.plugins.service import split_principal
import base64
-from pyasn1.error import PyAsn1Error
import logging
import traceback
from ipalib.text import _
@@ -77,37 +76,31 @@ from ipalib.request import context
from ipalib.output import Output
from ipalib.plugins.service import validate_principal
import nss.nss as nss
+from nss.error import NSPRError
def get_csr_hostname(csr):
"""
- Return the value of CN in the subject of the request
+ Return the value of CN in the subject of the request or None
"""
try:
request = pkcs10.load_certificate_request(csr)
- sub = request.get_subject().get_components()
- for s in sub:
- if s[0].lower() == "cn":
- return s[1]
- except PyAsn1Error:
- # The ASN.1 decoding errors tend to be long and involved and the
- # last bit is generally not interesting. We need the whole traceback.
- logging.error('Unable to decode CSR\n%s', traceback.format_exc())
- raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request'))
-
- return None
+ subject = pkcs10.get_subject(request)
+ return subject.common_name
+ except NSPRError, nsprerr:
+ raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request:'))
def get_subjectaltname(csr):
"""
- Return the value of the subject alt name, if any
+ Return the first value of the subject alt name, if any
"""
try:
request = pkcs10.load_certificate_request(csr)
- except PyAsn1Error:
- # The ASN.1 decoding errors tend to be long and involved and the
- # last bit is generally not interesting. We need the whole traceback.
- logging.error('Unable to decode CSR\n%s', traceback.format_exc())
+ for extension in request.extensions:
+ if extension.oid_tag == nss.SEC_OID_X509_SUBJECT_ALT_NAME:
+ return nss.x509_alt_name(extension.value)[0]
+ return None
+ except NSPRError, nsprerr:
raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request'))
- return request.get_subjectaltname()
def validate_csr(ugettext, csr):
"""
@@ -116,13 +109,9 @@ def validate_csr(ugettext, csr):
"""
try:
request = pkcs10.load_certificate_request(csr)
-
- # Explicitly request the attributes. This fires off additional
- # decoding to get things like the subjectAltName.
- attrs = request.get_attributes()
except TypeError, e:
raise errors.Base64DecodeError(reason=str(e))
- except PyAsn1Error:
+ except NSPRError:
raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request'))
except Exception, e:
raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request: %s') % str(e))
@@ -290,7 +279,8 @@ class cert_request(VirtualCommand):
raise errors.ACIError(info="Insufficient 'write' privilege to the 'userCertificate' attribute of entry '%s'." % dn)
# Validate the subject alt name, if any
- subjectaltname = get_subjectaltname(csr)
+ request = pkcs10.load_certificate_request(csr)
+ subjectaltname = pkcs10.get_subjectaltname(request)
if subjectaltname is not None:
for name in subjectaltname:
try:
diff --git a/ipapython/nsslib.py b/ipapython/nsslib.py
index 02bff00a8..7e249b3ba 100644
--- a/ipapython/nsslib.py
+++ b/ipapython/nsslib.py
@@ -122,6 +122,10 @@ class NSSConnection(httplib.HTTPConnection):
raise RuntimeError("dbdir is required")
logging.debug('%s init %s', self.__class__.__name__, host)
+ if nss.nss_is_initialized():
+ # close any open NSS database and use the new one
+ ssl.clear_session_cache()
+ nss.nss_shutdown()
nss.nss_init(dbdir)
ssl.set_domestic_policy()
nss.set_password_callback(self.password_callback)
diff --git a/ipaserver/install/dsinstance.py b/ipaserver/install/dsinstance.py
index e1ddef394..3ea9c94ce 100644
--- a/ipaserver/install/dsinstance.py
+++ b/ipaserver/install/dsinstance.py
@@ -366,7 +366,7 @@ class DsInstance(service.Service):
self._ldap_mod("ipa-winsync-conf.ldif")
def __config_version_module(self):
- self._ldap_mod("ipa-version-conf.ldif")
+ self._ldap_mod("version-conf.ldif")
def __user_private_groups(self):
if has_managed_entries(self.host_name, self.dm_password):
diff --git a/ipaserver/plugins/selfsign.py b/ipaserver/plugins/selfsign.py
index 39d1c539f..5333a89a3 100644
--- a/ipaserver/plugins/selfsign.py
+++ b/ipaserver/plugins/selfsign.py
@@ -45,10 +45,9 @@ import re
from ipaserver.plugins import rabase
from ipaserver.install import certs
import tempfile
-from pyasn1 import error
from ipalib import _
-from pyasn1.codec.der import encoder
from ipalib.plugins.cert import get_csr_hostname
+from nss.error import NSPRError
class ra(rabase.rabase):
"""
@@ -87,23 +86,19 @@ class ra(rabase.rabase):
config = api.Command['config_show']()['result']
subject_base = config.get('ipacertificatesubjectbase')[0]
hostname = get_csr_hostname(csr)
- request = pkcs10.load_certificate_request(csr)
base = re.split(',\s*(?=\w+=)', subject_base)
- base.reverse()
- base.append("CN=%s" % hostname)
- request_subject = request.get_subject().get_components()
- new_request = []
- for r in request_subject:
- new_request.append("%s=%s" % (r[0], r[1]))
-
- if str(base).lower() != str(new_request).lower():
- subject_base='CN=%s, %s' % (hostname, subject_base)
- new_request.reverse()
+ base.insert(0,'CN=%s' % hostname)
+ subject_base = ",".join(base)
+ request = pkcs10.load_certificate_request(csr)
+ # python-nss normalizes the request subject
+ request_subject = str(pkcs10.get_subject(request))
+
+ if str(subject_base).lower() != request_subject.lower():
raise errors.CertificateOperationError(error=_('Request subject "%(request_subject)s" does not match the form "%(subject_base)s"') % \
- {'request_subject' : ', '.join(new_request), 'subject_base' : subject_base})
+ {'request_subject' : request_subject, 'subject_base' : subject_base})
except errors.CertificateOperationError, e:
raise e
- except Exception, e:
+ except NSPRError, e:
raise errors.CertificateOperationError(error=_('unable to decode csr: %s' % e))
# certutil wants the CSR to have have a header and footer. Add one
@@ -207,11 +202,10 @@ class ra(rabase.rabase):
pass
try:
- # Grab the subject, reverse it, combine it and return it
subject = x509.get_subject(cert)
serial = x509.get_serial_number(cert)
- except error.PyAsn1Error, e:
+ except NSPRError, e:
self.log.error('Unable to decode certificate in entry: %s' % str(e))
raise errors.CertificateOperationError(error='Unable to decode certificate in entry: %s' % str(e))
diff --git a/tests/test_ipalib/test_x509.py b/tests/test_ipalib/test_x509.py
index 50e827caf..ca21e28cb 100644
--- a/tests/test_ipalib/test_x509.py
+++ b/tests/test_ipalib/test_x509.py
@@ -92,18 +92,18 @@ class test_x509(object):
Test retrieving the subject
"""
subject = x509.get_subject(goodcert)
- assert subject == 'CN=ipa.example.com,O=IPA'
+ assert str(subject) == 'CN=ipa.example.com,O=IPA'
der = base64.b64decode(goodcert)
subject = x509.get_subject(der, x509.DER)
- assert subject == 'CN=ipa.example.com,O=IPA'
+ assert str(subject) == 'CN=ipa.example.com,O=IPA'
# We should be able to pass in a tuple/list of certs too
subject = x509.get_subject((goodcert))
- assert subject == 'CN=ipa.example.com,O=IPA'
+ assert str(subject) == 'CN=ipa.example.com,O=IPA'
subject = x509.get_subject([goodcert])
- assert subject == 'CN=ipa.example.com,O=IPA'
+ assert str(subject) == 'CN=ipa.example.com,O=IPA'
def test_2_get_serial_number(self):
"""
@@ -132,8 +132,8 @@ class test_x509(object):
cert = x509.load_certificate(goodcert)
- assert cert.subject == 'CN=ipa.example.com,O=IPA'
- assert cert.issuer == 'CN=IPA Test Certificate Authority'
+ assert str(cert.subject) == 'CN=ipa.example.com,O=IPA'
+ assert str(cert.issuer) == 'CN=IPA Test Certificate Authority'
assert cert.serial_number == 1093
assert cert.valid_not_before_str == 'Fri Jun 25 13:00:42 2010 UTC'
assert cert.valid_not_after_str == 'Thu Jun 25 13:00:42 2015 UTC'
diff --git a/tests/test_pkcs10/test3.csr b/tests/test_pkcs10/test3.csr
new file mode 100644
index 000000000..82c84d154
--- /dev/null
+++ b/tests/test_pkcs10/test3.csr
@@ -0,0 +1,3 @@
+-----BEGIN NEW CERTIFICATE REQUEST-----
+VGhpcyBpcyBhbiBpbnZhbGlkIENTUg==
+-----END NEW CERTIFICATE REQUEST-----
diff --git a/tests/test_pkcs10/test4.csr b/tests/test_pkcs10/test4.csr
new file mode 100644
index 000000000..9f08b802b
--- /dev/null
+++ b/tests/test_pkcs10/test4.csr
@@ -0,0 +1,4 @@
+-----BEGIN NEW CERTIFICATE REQUEST-----
+Invalidate data
+-----END NEW CERTIFICATE REQUEST-----
+
diff --git a/tests/test_pkcs10/test5.csr b/tests/test_pkcs10/test5.csr
new file mode 100644
index 000000000..41c3c1f3d
--- /dev/null
+++ b/tests/test_pkcs10/test5.csr
@@ -0,0 +1,20 @@
+
+Certificate request generated by Netscape certutil
+Phone: (not specified)
+
+Common Name: test.example.com
+Email: (not specified)
+Organization: IPA
+State: (not specified)
+Country: (not specified)
+
+-----BEGIN NEW CERTIFICATE REQUEST-----
+MIIBaDCB0gIBADApMQwwCgYDVQQKEwNJUEExGTAXBgNVBAMTEHRlc3QuZXhhbXBs
+ZS5jb20wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAPnSCLwl7IytP2HC7+zv
+nI2fe6oRCE/J8K1jIoiqS9engx3Yfe4kaXWWzcwmuUV57VhUmWDEQIbSREPdrVSi
+tWC55ilGmPOAEw+mP4qg6Ctb+d8Egmy1JVrpIYCLNXvEd3dAaimB0J+K3hKFRyHI
+2MzrIuFqqohRijkDLwB8oVVdAgMBAAGgADANBgkqhkiG9w0BAQUFAAOBgQACt37K
+j+RMEbqG8s0Uxs3FhcfiAx8Do99CDizY/b7hZEgMyG4dLmm+vSCBbxBrG5oMlxJD
+dxnpk0PQSknNkJVrCS/J1OTpOPRTi4VKATT3tHJAfDbWZTwcSelUCLQ4lREiuT3D
+WP4vKrLIxDJDb+/mwuV7WWo34E6MD9iTB1xINg==
+-----END NEW CERTIFICATE REQUEST-----
diff --git a/tests/test_pkcs10/test_pkcs10.py b/tests/test_pkcs10/test_pkcs10.py
index 66d205b96..4c8ba1366 100644
--- a/tests/test_pkcs10/test_pkcs10.py
+++ b/tests/test_pkcs10/test_pkcs10.py
@@ -26,6 +26,8 @@ import nose
from tests.util import raises, PluginTester
from ipalib import pkcs10
from ipapython import ipautil
+import nss.nss as nss
+from nss.error import NSPRError
class test_update(object):
"""
@@ -33,6 +35,7 @@ class test_update(object):
"""
def setUp(self):
+ nss.nss_init_nodb()
if ipautil.file_exists("test0.csr"):
self.testdir="./"
elif ipautil.file_exists("tests/test_pkcs10/test0.csr"):
@@ -53,15 +56,11 @@ class test_update(object):
csr = self.read_file("test0.csr")
request = pkcs10.load_certificate_request(csr)
- attributes = request.get_attributes()
- subject = request.get_subject()
- components = subject.get_components()
- compdict = dict(components)
+ subject = pkcs10.get_subject(request)
- assert(attributes == ())
- assert(compdict['CN'] == u'test.example.com')
- assert(compdict['ST'] == u'California')
- assert(compdict['C'] == u'US')
+ assert(subject.common_name == 'test.example.com')
+ assert(subject.state_name == 'California')
+ assert(subject.country_name == 'US')
def test_1(self):
"""
@@ -70,23 +69,15 @@ class test_update(object):
csr = self.read_file("test1.csr")
request = pkcs10.load_certificate_request(csr)
- attributes = request.get_attributes()
- subject = request.get_subject()
- components = subject.get_components()
- compdict = dict(components)
- attrdict = dict(attributes)
+ subject = pkcs10.get_subject(request)
- assert(compdict['CN'] == u'test.example.com')
- assert(compdict['ST'] == u'California')
- assert(compdict['C'] == u'US')
+ assert(subject.common_name == 'test.example.com')
+ assert(subject.state_name == 'California')
+ assert(subject.country_name == 'US')
- extensions = attrdict['1.2.840.113549.1.9.14']
-
- for ext in range(len(extensions)):
- if extensions[ext][0] == '2.5.29.17':
- names = extensions[ext][2]
- # check the dNSName field
- assert(names[2] == [u'testlow.example.com'])
+ for extension in request.extensions:
+ if extension.oid_tag == nss.SEC_OID_X509_SUBJECT_ALT_NAME:
+ assert nss.x509_alt_name(extension.value)[0] == 'testlow.example.com'
def test_2(self):
"""
@@ -95,25 +86,39 @@ class test_update(object):
csr = self.read_file("test2.csr")
request = pkcs10.load_certificate_request(csr)
- attributes = request.get_attributes()
- subject = request.get_subject()
- components = subject.get_components()
- compdict = dict(components)
- attrdict = dict(attributes)
-
- assert(compdict['CN'] == u'test.example.com')
- assert(compdict['ST'] == u'California')
- assert(compdict['C'] == u'US')
-
- extensions = attrdict['1.2.840.113549.1.9.14']
-
- for ext in range(len(extensions)):
- if extensions[ext][0] == '2.5.29.17':
- names = extensions[ext][2]
- # check the dNSName field
- assert(names[2] == [u'testlow.example.com'])
- if extensions[ext][0] == '2.5.29.31':
- urls = extensions[ext][2]
- assert(len(urls) == 2)
- assert(urls[0] == u'http://ca.example.com/my.crl')
- assert(urls[1] == u'http://other.example.com/my.crl')
+ subject = pkcs10.get_subject(request)
+
+ assert(subject.common_name == 'test.example.com')
+ assert(subject.state_name == 'California')
+ assert(subject.country_name == 'US')
+
+ for extension in request.extensions:
+ if extension.oid_tag == nss.SEC_OID_X509_SUBJECT_ALT_NAME:
+ assert nss.x509_alt_name(extension.value)[0] == 'testlow.example.com'
+ if extension.oid_tag == nss.SEC_OID_X509_CRL_DIST_POINTS:
+ pts = nss.CRLDistributionPts(extension.value)
+ urls = pts[0].get_general_names()
+ assert('http://ca.example.com/my.crl' in urls)
+ assert('http://other.example.com/my.crl' in urls)
+
+ def test_3(self):
+ """
+ Test CSR with base64-encoded bogus data
+ """
+ csr = self.read_file("test3.csr")
+
+ try:
+ request = pkcs10.load_certificate_request(csr)
+ except NSPRError, nsprerr:
+ # (SEC_ERROR_BAD_DER) security library: improperly formatted DER-encoded message.
+ assert(nsprerr. errno== -8183)
+
+ def test_4(self):
+ """
+ Test CSR with badly formatted base64-encoded data
+ """
+ csr = self.read_file("test4.csr")
+ try:
+ request = pkcs10.load_certificate_request(csr)
+ except TypeError, typeerr:
+ assert(str(typeerr) == 'Incorrect padding')