summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRob Crittenden <rcritten@redhat.com>2010-07-20 14:00:43 -0400
committerRob Crittenden <rcritten@redhat.com>2010-07-29 10:50:10 -0400
commitb7ca3d68c28b54500a2f908c4e2e6c89b2433461 (patch)
treefca9d664df546fca527a8194e0b4e9e301aa1b06
parent563c7cde407bc63621a14b1fddff972a105dfc50 (diff)
downloadfreeipa-b7ca3d68c28b54500a2f908c4e2e6c89b2433461.tar.gz
freeipa-b7ca3d68c28b54500a2f908c4e2e6c89b2433461.tar.xz
freeipa-b7ca3d68c28b54500a2f908c4e2e6c89b2433461.zip
Drop our own PKCS#10 ASN.1 decoder and use the one from python-nss
This patch: - bumps up the minimum version of python-nss - will initialize NSS with nodb if a CSR is loaded and it isn't already init'd - will shutdown NSS if initialized in the RPC subsystem so we use right db - updated and added a few more tests Relying more on NSS introduces a bit of a problem. For NSS to work you need to have initialized a database (either a real one or no_db). But once you've initialized one and want to use another you have to close down the first one. I've added some code to nsslib.py to do just that. This could potentially have some bad side-effects at some point, it works ok now.
-rw-r--r--ipa.spec.in5
-rw-r--r--ipalib/pkcs10.py426
-rw-r--r--ipalib/plugins/cert.py40
-rw-r--r--ipapython/nsslib.py4
-rw-r--r--ipaserver/install/dsinstance.py2
-rw-r--r--ipaserver/plugins/selfsign.py28
-rw-r--r--tests/test_ipalib/test_x509.py12
-rw-r--r--tests/test_pkcs10/test3.csr3
-rw-r--r--tests/test_pkcs10/test4.csr4
-rw-r--r--tests/test_pkcs10/test5.csr20
-rw-r--r--tests/test_pkcs10/test_pkcs10.py95
11 files changed, 158 insertions, 481 deletions
diff --git a/ipa.spec.in b/ipa.spec.in
index 8bca7337f..0ccf7018f 100644
--- a/ipa.spec.in
+++ b/ipa.spec.in
@@ -176,7 +176,7 @@ Requires: python-kerberos >= 1.1-3
Requires: authconfig
Requires: gnupg
Requires: pyOpenSSL
-Requires: python-nss >= 0.9
+Requires: python-nss >= 0.9-8
Requires: python-lxml
%description python
@@ -494,6 +494,9 @@ fi
%endif
%changelog
+* Mon Jul 19 2010 Rob Crittenden <rcritten@redhat.com> - 1.99-25
+- Bump up minimum version of python-nss to pick up nss_is_initialize() API
+
* Thu Jun 24 2010 Adam Young <ayoung@redhat.com> - 1.99-24
- Removed python-asset based webui
diff --git a/ipalib/pkcs10.py b/ipalib/pkcs10.py
index 9119d12e2..3011c1f7a 100644
--- a/ipalib/pkcs10.py
+++ b/ipalib/pkcs10.py
@@ -1,7 +1,7 @@
# Authors:
# Rob Crittenden <rcritten@redhat.com>
#
-# Copyright (C) 2009 Red Hat
+# Copyright (C) 2010 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
@@ -17,356 +17,34 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-# Read PKCS#10 certificate requests (see RFC 2986 and 5280)
+import os
+import sys
+import base64
+import nss.nss as nss
+from ipapython import ipautil
+from ipalib import api
-# NOTE: Not every extension is currently handled. Known to now work:
-# 2.5.29.37 - extKeyUsage
+PEM = 0
+DER = 1
-import sys, string, base64
-from pyasn1.type import base,tag,namedtype,namedval,univ,constraint,char,useful
-from pyasn1.codec.der import decoder, encoder
-from pyasn1 import error
-import copy
-
-# Common OIDs found in a subject
-oidtable = { "2.5.4.3": "CN",
- "2.5.4.6": "C",
- "2.5.4.7": "L",
- "2.5.4.8": "ST",
- "2.5.4.10": "O",
- "2.5.4.11": "OU",
- "1.2.840.113549.1.9.1": "E",
- "0.9.2342.19200300.100.1.25": "DC",
- }
-
-# Some useful OIDs
-FRIENDLYNAME = '1.2.840.113549.1.9.20'
-EXTENSIONREQUEST = '1.2.840.113549.1.9.14'
-
-MAX = 32 # from mozilla/security/nss/lib/util/secasn1t.h
-
-class DirectoryString(univ.Choice):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('teletexString', char.TeletexString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
- namedtype.NamedType('printableString', char.PrintableString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
- namedtype.NamedType('universalString', char.UniversalString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
- namedtype.NamedType('utf8String', char.UTF8String().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
- namedtype.NamedType('bmpString', char.BMPString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
- )
-
-class AttributeValue(DirectoryString): pass
-
-class AttributeType(univ.ObjectIdentifier): pass
-
-class AttributeTypeAndValue(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('type', AttributeType()),
- namedtype.NamedType('value', AttributeValue()) # FIXME, could be any type
- )
-
-class KeyPurposeId(univ.ObjectIdentifier): pass
-
-class ExtKeyUsageSyntax(univ.SequenceOf):
- componentType = KeyPurposeId()
-
-class UPN(char.UTF8String):
- tagSet = char.UTF8String.tagSet.tagExplicitly(
- tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0)
- )
-
-class AttributeValueSet(univ.SetOf):
- componentType = univ.Any()
- sizeSpec = univ.SetOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX)
-
-class Attribute(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('type', AttributeType()),
- namedtype.NamedType('values', AttributeValueSet()),
- )
-
-class Attributes(univ.SetOf):
- componentType = Attribute()
-
-class RelativeDistinguishedName(univ.SetOf):
- componentType = AttributeTypeAndValue()
-
-class RDNSequence(univ.SequenceOf):
- componentType = RelativeDistinguishedName()
-
-class Name(univ.Choice):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('', RDNSequence())
- )
-
- def get_components(self):
- components = self.getComponentByPosition(0)
- complist = []
- for idx in range(len(components)):
- attrandvalue = components[idx].getComponentByPosition(0)
- oid = attrandvalue.getComponentByPosition(0)
- # FIXME, should handle any string type
- value = attrandvalue.getComponentByPosition(1).getComponentByType(char.PrintableString.tagSet)
- if value is None:
- value = attrandvalue.getComponentByPosition(1).getComponentByType(char.UTF8String.tagSet)
- if value is None:
- value = attrandvalue.getComponentByPosition(1).getComponentByType(char.IA5String.tagSet)
- vout = value.prettyOut(value).decode('utf-8')
- oidout = oid.prettyOut(oid).decode('utf-8')
- c = ((oidtable.get(oidout, oidout), vout))
- complist.append(c)
-
- return tuple(complist)
-
-class AnotherName(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('type-id', univ.ObjectIdentifier()),
- namedtype.NamedType('value', univ.Any())
- )
-
-class rfc822Name(char.IA5String):
- tagSet = char.IA5String.tagSet.tagImplicitly(
- tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1)
- )
-
-class dNSName(char.IA5String):
- tagSet = char.IA5String.tagSet.tagImplicitly(
- tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2)
- )
-
-class x400Address(univ.OctetString):
- tagSet = univ.OctetString.tagSet.tagImplicitly(
- tag.Tag(tag.tagClassUniversal, tag.tagFormatSimple, 3)
- )
-
-class directoryName(Name):
- tagSet = Name.tagSet.tagImplicitly(
- tag.Tag(tag.tagClassUniversal, tag.tagFormatSimple, 4)
- )
-
-class uniformResourceIdentifier(char.IA5String):
- tagSet = char.IA5String.tagSet.tagImplicitly(
- tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 6)
- )
-
-# Not all general types are handled, nor are these necessarily done
-# per the specification.
-class GeneralName(univ.Choice):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('otherName', AnotherName().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))),
- namedtype.NamedType('rfc822Name', rfc822Name()), #1
- namedtype.NamedType('dNSName', dNSName()), #2
- namedtype.NamedType('x400Address', x400Address()), #3
- namedtype.NamedType('directoryName', directoryName()), #4
- # 5 FIXME
- namedtype.NamedType('uniformResourceIdentifier', uniformResourceIdentifier()), #6
-# namedtype.NamedType('uniformResourceIdentifier', char.IA5String(tagSet=char.IA5String.tagSet.tagImplicitly(tag.Tag(tag.tagClassUniversal, tag.tagFormatSimple, 6)))),
- )
-
-class GeneralNames(univ.SequenceOf):
- componentType = GeneralName()
- sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX)
-
-class SubjectAltName(univ.SequenceOf):
- componentType = GeneralName()
- sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX)
-
-class DistributionPointName(univ.Choice):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('fullName', GeneralNames().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))),
- namedtype.NamedType('nameRelativeToCRLIssuer', RelativeDistinguishedName().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))),
- )
-
-class DistributionPoint(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.OptionalNamedType('distributionPoint', DistributionPointName().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))),
- namedtype.OptionalNamedType('reasons', univ.BitString().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))), # FIXME
- namedtype.OptionalNamedType('cRLIssuer', GeneralNames().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))),
- )
-
-class cRLDistributionPoints(univ.SequenceOf):
- componentType = DistributionPoint()
- sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX)
-
-class basicConstraints(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.DefaultedNamedType('cA', univ.Boolean('False')),
- namedtype.OptionalNamedType('pathLenConstraint', univ.Integer()),
- )
-
-class AlgorithmIdentifier(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('algorithm', univ.ObjectIdentifier()),
- namedtype.OptionalNamedType('parameters', univ.Any())
- )
-
-class SubjectPublicKeyInfo(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('algorithm', AlgorithmIdentifier()),
- namedtype.NamedType('subjectPublicKey', univ.BitString())
- )
-
-class Version(univ.Integer): pass
-
-class CertificationRequestInfo(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('version', Version()),
- namedtype.NamedType('subject', Name()),
- namedtype.NamedType('subjectPublicKeyInfo', SubjectPublicKeyInfo()),
- namedtype.OptionalNamedType('attributes', Attributes().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0)))
- )
-
-class CertificationRequest(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('certificationRequestInfo', CertificationRequestInfo()),
- namedtype.NamedType('signatureAlgorithm', AlgorithmIdentifier()),
- namedtype.NamedType('signatureValue', univ.BitString())
- )
-
- def get_version(self):
- info = self.getComponentByName('certificationRequestInfo')
- version = info.getComponentByName('version')
- return version._value
-
- def get_subject(self):
- info = self.getComponentByName('certificationRequestInfo')
- return info.getComponentByName('subject')
-
- def get_subjectaltname(self):
- attrs = self.get_attributes()
- attrdict = dict(attrs)
- if EXTENSIONREQUEST in attrdict:
- # Extensions are a 3 position tuple
- for ext in attrdict[EXTENSIONREQUEST]:
- if ext[0] == '2.5.29.17':
- # alt name is in the dNSName position
- return ext[2][2]
-
- def get_attributes(self):
- info = self.getComponentByName('certificationRequestInfo')
- attrs = info.getComponentByName('attributes')
- attributes = []
-
- for idx in range(len(attrs)):
- atype = attrs[idx].getComponentByPosition(0)
- aval = attrs[idx].getComponentByPosition(1)
-
- # The attribute list is of type Any, need to re-encode
- aenc = encoder.encode(aval, maxChunkSize=1024)
- decoded = decoder.decode(aenc)[0]
- oid = atype.prettyOut(atype)
-
- if oid == "1.2.840.113549.1.9.20": # PKCS#9 Friendly Name
- value = decoded.getComponentByPosition(0)
- t = (oid, value.prettyOut(value).decode('utf-8'))
- attributes.append(t)
- elif oid == "1.2.840.113549.1.9.14": # PKCS#9 Extension Req
- extensions = []
- extlist = decoded.getComponentByPosition(0)
- for jdx in range(len(extlist)):
- ext = extlist.getComponentByPosition(jdx)
- # An extension has 3 elements:
- # oid
- # bool - critical
- # value
- if len(ext) == 2: # If no critical, default to False
- extoid = atype.prettyOut(ext.getComponentByPosition(0))
- critical = False
- extvalue = ext.getComponentByPosition(1)
- else:
- extoid = atype.prettyOut(ext.getComponentByPosition(0))
- critical = bool(ext.getComponentByPosition(1)._value)
- extvalue = ext.getComponentByPosition(2)
-
- if extoid == '2.5.29.19': # basicConstraints
- extdecoded = decoder.decode(extvalue._value, asn1Spec=basicConstraints())[0]
- ca = bool(extdecoded[0])
- if len(extdecoded) == 2: # path length is optional
- pathlen = extdecoded[1]._value
- else:
- pathlen = None
- constraint = (ca, pathlen)
- e = (extoid, critical, constraint)
- extensions.append(e)
- continue
- elif extoid == '2.5.29.31': # cRLDistributionPoints
- extdecoded = decoder.decode(extvalue._value, asn1Spec=cRLDistributionPoints())[0]
- distpoints = []
- for elem in range(len(extdecoded)):
- name = extdecoded[elem]
- # DistributionPoint is position 0
- distpoint = name.getComponentByPosition(0)
- # fullName is position 0
- fullname = distpoint.getComponentByPosition(0)
- for crl in range(len(fullname)):
- # Get the GeneralName, URI type
- uri = fullname.getComponentByPosition(crl).getComponentByPosition(5)
- distpoints.append(uri.prettyOut(uri).decode('utf-8'))
- e = (extoid, critical, tuple(distpoints))
- extensions.append(e)
- continue
-
- # The data is is encoded as "Any". Pull the raw data out
- # and re-decode it using a different specification.
- try:
- extdecoded = decoder.decode(extvalue._value, asn1Spec=GeneralNames())[0]
- except error.PyAsn1Error:
- # I've seen CSRs where this isn't a sequence of names
- # but is a single name, try to handle that too.
- try:
- extdecoded = decoder.decode(extvalue._value, asn1Spec=GeneralName())[0]
- extdecoded = [extdecoded]
- except error.PyAsn1Error, e:
- # skip for now
- generalnames = 9*["Error"]
- e = (extoid, critical, tuple(generalnames))
- extensions.append(e)
- continue
-
- # We now have a list of extensions in the order they
- # are in the request as GeneralNames. We iterate through
- # each of those to get a GeneralName. We then have to
- # iterate through that to find the position set in it.
-
- # Note that not every type will be returned. Those that
- # are handled are returned in a tuple in the position
- # which they are in the request.
- generalnames = 9*[None]
- for elem in range(len(extdecoded)):
- name = extdecoded[elem]
- for n in range(len(name)):
- if name[n] is None:
- continue
- if generalnames[n] is None:
- generalnames[n] = []
- if n == 3: # OctetString
- generalnames[n].append(name[n]._value)
- if n in [1, 2, 6]: # IA5String
- if n == 6 and extoid == "2.5.29.37":
- # Extended key usage
- v = copy.deepcopy(extvalue._value)
- othername = decoder.decode(v, asn1Spec=ExtKeyUsageSyntax())[0]
- keyusage = []
- for l in range(len(othername)):
- keyusage.append(othername[l].prettyOut(othername[l]))
+def get_subjectaltname(request):
+ """
+ Given a CSR return the subjectaltname value, if any.
- generalnames[n] = tuple(keyusage)
- else:
- generalnames[n].append(name[n].prettyOut(name[n]).decode('utf-8'))
- if n == 0: # AnotherName
- nameoid = name[n].getComponentByPosition(0)
- nameoid = nameoid.prettyOut(nameoid)
- val = name[n].getComponentByPosition(1)
- if nameoid == "1.3.6.1.4.1.311.20.2.3": # UPN
- v = copy.deepcopy(val._value)
- othername = decoder.decode(v, asn1Spec=UPN())[0]
- generalnames[0].append(othername.prettyOut(othername).decode('utf-8'))
+ The return value is a tuple of strings or None
+ """
+ for extension in request.extensions:
+ if extension.oid_tag == nss.SEC_OID_X509_SUBJECT_ALT_NAME:
+ return nss.x509_alt_name(extension.value)
+ return None
- e = (extoid, critical, tuple(generalnames))
- extensions.append(e)
- t = (oid, tuple(extensions))
- attributes.append(t)
+def get_subject(request):
+ """
+ Given a CSR return the subject value.
- return tuple(attributes)
+ This returns an nss.DN object.
+ """
+ return request.subject
def strip_header(csr):
"""
@@ -392,50 +70,26 @@ def load_certificate_request(csr):
substrate = base64.b64decode(csr)
- return decoder.decode(substrate, asn1Spec=CertificationRequest())[0]
+ # A fail-safe so we can always read a CSR. python-nss/NSS will segfault
+ # otherwise
+ if not nss.nss_is_initialized():
+ nss.nss_init_nodb()
-if __name__ == '__main__':
- # Read PEM certs from stdin and print them out in plain text
-
- stSpam, stHam, stDump = 0, 1, 2
- state = stSpam
+ return nss.CertificateRequest(substrate)
- for certLine in sys.stdin.readlines():
- certLine = string.strip(certLine)
- if state == stSpam:
- if state == stSpam:
- if certLine == '-----BEGIN NEW CERTIFICATE REQUEST-----':
- certLines = []
- state = stHam
- continue
- if state == stHam:
- if certLine == '-----END NEW CERTIFICATE REQUEST-----':
- state = stDump
- else:
- certLines.append(certLine)
- complist = []
- if state == stDump:
- substrate = ''
- for certLine in certLines:
- substrate = substrate + base64.b64decode(certLine)
+if __name__ == '__main__':
+ nss.nss_init_nodb()
- request = decoder.decode(substrate, asn1Spec=CertificationRequest())[0]
- subject = request.get_subject()
- attrs = request.get_attributes()
- print "Attributes:"
- print attrs
+ # Read PEM request from stdin and print out its components
- print "Subject:"
- complist = subject.get_components()
- print complist
- out=""
- for c in complist:
- out = out + "%s=%s," % (c[0], c[1])
- print out[:-1]
+ csrlines = sys.stdin.readlines()
+ csrlines = fp.readlines()
+ fp.close()
+ csr = ''.join(csrlines)
- print request.get_subjectaltname()
+ csr = load_certificate_request(csr)
- # Re-encode the request just to be sure things are working
- assert encoder.encode(request, maxChunkSize=1024) == substrate, 'cert recode fails'
+ print csr
- state = stSpam
+ print get_subject(csr)
+ print get_subjectaltname(csr)
diff --git a/ipalib/plugins/cert.py b/ipalib/plugins/cert.py
index 1de4ac64e..ed1d65ad2 100644
--- a/ipalib/plugins/cert.py
+++ b/ipalib/plugins/cert.py
@@ -69,7 +69,6 @@ from ipalib import x509
from ipalib.plugins.virtual import *
from ipalib.plugins.service import split_principal
import base64
-from pyasn1.error import PyAsn1Error
import logging
import traceback
from ipalib.text import _
@@ -77,37 +76,31 @@ from ipalib.request import context
from ipalib.output import Output
from ipalib.plugins.service import validate_principal
import nss.nss as nss
+from nss.error import NSPRError
def get_csr_hostname(csr):
"""
- Return the value of CN in the subject of the request
+ Return the value of CN in the subject of the request or None
"""
try:
request = pkcs10.load_certificate_request(csr)
- sub = request.get_subject().get_components()
- for s in sub:
- if s[0].lower() == "cn":
- return s[1]
- except PyAsn1Error:
- # The ASN.1 decoding errors tend to be long and involved and the
- # last bit is generally not interesting. We need the whole traceback.
- logging.error('Unable to decode CSR\n%s', traceback.format_exc())
- raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request'))
-
- return None
+ subject = pkcs10.get_subject(request)
+ return subject.common_name
+ except NSPRError, nsprerr:
+ raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request:'))
def get_subjectaltname(csr):
"""
- Return the value of the subject alt name, if any
+ Return the first value of the subject alt name, if any
"""
try:
request = pkcs10.load_certificate_request(csr)
- except PyAsn1Error:
- # The ASN.1 decoding errors tend to be long and involved and the
- # last bit is generally not interesting. We need the whole traceback.
- logging.error('Unable to decode CSR\n%s', traceback.format_exc())
+ for extension in request.extensions:
+ if extension.oid_tag == nss.SEC_OID_X509_SUBJECT_ALT_NAME:
+ return nss.x509_alt_name(extension.value)[0]
+ return None
+ except NSPRError, nsprerr:
raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request'))
- return request.get_subjectaltname()
def validate_csr(ugettext, csr):
"""
@@ -116,13 +109,9 @@ def validate_csr(ugettext, csr):
"""
try:
request = pkcs10.load_certificate_request(csr)
-
- # Explicitly request the attributes. This fires off additional
- # decoding to get things like the subjectAltName.
- attrs = request.get_attributes()
except TypeError, e:
raise errors.Base64DecodeError(reason=str(e))
- except PyAsn1Error:
+ except NSPRError:
raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request'))
except Exception, e:
raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request: %s') % str(e))
@@ -290,7 +279,8 @@ class cert_request(VirtualCommand):
raise errors.ACIError(info="Insufficient 'write' privilege to the 'userCertificate' attribute of entry '%s'." % dn)
# Validate the subject alt name, if any
- subjectaltname = get_subjectaltname(csr)
+ request = pkcs10.load_certificate_request(csr)
+ subjectaltname = pkcs10.get_subjectaltname(request)
if subjectaltname is not None:
for name in subjectaltname:
try:
diff --git a/ipapython/nsslib.py b/ipapython/nsslib.py
index 02bff00a8..7e249b3ba 100644
--- a/ipapython/nsslib.py
+++ b/ipapython/nsslib.py
@@ -122,6 +122,10 @@ class NSSConnection(httplib.HTTPConnection):
raise RuntimeError("dbdir is required")
logging.debug('%s init %s', self.__class__.__name__, host)
+ if nss.nss_is_initialized():
+ # close any open NSS database and use the new one
+ ssl.clear_session_cache()
+ nss.nss_shutdown()
nss.nss_init(dbdir)
ssl.set_domestic_policy()
nss.set_password_callback(self.password_callback)
diff --git a/ipaserver/install/dsinstance.py b/ipaserver/install/dsinstance.py
index e1ddef394..3ea9c94ce 100644
--- a/ipaserver/install/dsinstance.py
+++ b/ipaserver/install/dsinstance.py
@@ -366,7 +366,7 @@ class DsInstance(service.Service):
self._ldap_mod("ipa-winsync-conf.ldif")
def __config_version_module(self):
- self._ldap_mod("ipa-version-conf.ldif")
+ self._ldap_mod("version-conf.ldif")
def __user_private_groups(self):
if has_managed_entries(self.host_name, self.dm_password):
diff --git a/ipaserver/plugins/selfsign.py b/ipaserver/plugins/selfsign.py
index 39d1c539f..5333a89a3 100644
--- a/ipaserver/plugins/selfsign.py
+++ b/ipaserver/plugins/selfsign.py
@@ -45,10 +45,9 @@ import re
from ipaserver.plugins import rabase
from ipaserver.install import certs
import tempfile
-from pyasn1 import error
from ipalib import _
-from pyasn1.codec.der import encoder
from ipalib.plugins.cert import get_csr_hostname
+from nss.error import NSPRError
class ra(rabase.rabase):
"""
@@ -87,23 +86,19 @@ class ra(rabase.rabase):
config = api.Command['config_show']()['result']
subject_base = config.get('ipacertificatesubjectbase')[0]
hostname = get_csr_hostname(csr)
- request = pkcs10.load_certificate_request(csr)
base = re.split(',\s*(?=\w+=)', subject_base)
- base.reverse()
- base.append("CN=%s" % hostname)
- request_subject = request.get_subject().get_components()
- new_request = []
- for r in request_subject:
- new_request.append("%s=%s" % (r[0], r[1]))
-
- if str(base).lower() != str(new_request).lower():
- subject_base='CN=%s, %s' % (hostname, subject_base)
- new_request.reverse()
+ base.insert(0,'CN=%s' % hostname)
+ subject_base = ",".join(base)
+ request = pkcs10.load_certificate_request(csr)
+ # python-nss normalizes the request subject
+ request_subject = str(pkcs10.get_subject(request))
+
+ if str(subject_base).lower() != request_subject.lower():
raise errors.CertificateOperationError(error=_('Request subject "%(request_subject)s" does not match the form "%(subject_base)s"') % \
- {'request_subject' : ', '.join(new_request), 'subject_base' : subject_base})
+ {'request_subject' : request_subject, 'subject_base' : subject_base})
except errors.CertificateOperationError, e:
raise e
- except Exception, e:
+ except NSPRError, e:
raise errors.CertificateOperationError(error=_('unable to decode csr: %s' % e))
# certutil wants the CSR to have have a header and footer. Add one
@@ -207,11 +202,10 @@ class ra(rabase.rabase):
pass
try:
- # Grab the subject, reverse it, combine it and return it
subject = x509.get_subject(cert)
serial = x509.get_serial_number(cert)
- except error.PyAsn1Error, e:
+ except NSPRError, e:
self.log.error('Unable to decode certificate in entry: %s' % str(e))
raise errors.CertificateOperationError(error='Unable to decode certificate in entry: %s' % str(e))
diff --git a/tests/test_ipalib/test_x509.py b/tests/test_ipalib/test_x509.py
index 50e827caf..ca21e28cb 100644
--- a/tests/test_ipalib/test_x509.py
+++ b/tests/test_ipalib/test_x509.py
@@ -92,18 +92,18 @@ class test_x509(object):
Test retrieving the subject
"""
subject = x509.get_subject(goodcert)
- assert subject == 'CN=ipa.example.com,O=IPA'
+ assert str(subject) == 'CN=ipa.example.com,O=IPA'
der = base64.b64decode(goodcert)
subject = x509.get_subject(der, x509.DER)
- assert subject == 'CN=ipa.example.com,O=IPA'
+ assert str(subject) == 'CN=ipa.example.com,O=IPA'
# We should be able to pass in a tuple/list of certs too
subject = x509.get_subject((goodcert))
- assert subject == 'CN=ipa.example.com,O=IPA'
+ assert str(subject) == 'CN=ipa.example.com,O=IPA'
subject = x509.get_subject([goodcert])
- assert subject == 'CN=ipa.example.com,O=IPA'
+ assert str(subject) == 'CN=ipa.example.com,O=IPA'
def test_2_get_serial_number(self):
"""
@@ -132,8 +132,8 @@ class test_x509(object):
cert = x509.load_certificate(goodcert)
- assert cert.subject == 'CN=ipa.example.com,O=IPA'
- assert cert.issuer == 'CN=IPA Test Certificate Authority'
+ assert str(cert.subject) == 'CN=ipa.example.com,O=IPA'
+ assert str(cert.issuer) == 'CN=IPA Test Certificate Authority'
assert cert.serial_number == 1093
assert cert.valid_not_before_str == 'Fri Jun 25 13:00:42 2010 UTC'
assert cert.valid_not_after_str == 'Thu Jun 25 13:00:42 2015 UTC'
diff --git a/tests/test_pkcs10/test3.csr b/tests/test_pkcs10/test3.csr
new file mode 100644
index 000000000..82c84d154
--- /dev/null
+++ b/tests/test_pkcs10/test3.csr
@@ -0,0 +1,3 @@
+-----BEGIN NEW CERTIFICATE REQUEST-----
+VGhpcyBpcyBhbiBpbnZhbGlkIENTUg==
+-----END NEW CERTIFICATE REQUEST-----
diff --git a/tests/test_pkcs10/test4.csr b/tests/test_pkcs10/test4.csr
new file mode 100644
index 000000000..9f08b802b
--- /dev/null
+++ b/tests/test_pkcs10/test4.csr
@@ -0,0 +1,4 @@
+-----BEGIN NEW CERTIFICATE REQUEST-----
+Invalidate data
+-----END NEW CERTIFICATE REQUEST-----
+
diff --git a/tests/test_pkcs10/test5.csr b/tests/test_pkcs10/test5.csr
new file mode 100644
index 000000000..41c3c1f3d
--- /dev/null
+++ b/tests/test_pkcs10/test5.csr
@@ -0,0 +1,20 @@
+
+Certificate request generated by Netscape certutil
+Phone: (not specified)
+
+Common Name: test.example.com
+Email: (not specified)
+Organization: IPA
+State: (not specified)
+Country: (not specified)
+
+-----BEGIN NEW CERTIFICATE REQUEST-----
+MIIBaDCB0gIBADApMQwwCgYDVQQKEwNJUEExGTAXBgNVBAMTEHRlc3QuZXhhbXBs
+ZS5jb20wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAPnSCLwl7IytP2HC7+zv
+nI2fe6oRCE/J8K1jIoiqS9engx3Yfe4kaXWWzcwmuUV57VhUmWDEQIbSREPdrVSi
+tWC55ilGmPOAEw+mP4qg6Ctb+d8Egmy1JVrpIYCLNXvEd3dAaimB0J+K3hKFRyHI
+2MzrIuFqqohRijkDLwB8oVVdAgMBAAGgADANBgkqhkiG9w0BAQUFAAOBgQACt37K
+j+RMEbqG8s0Uxs3FhcfiAx8Do99CDizY/b7hZEgMyG4dLmm+vSCBbxBrG5oMlxJD
+dxnpk0PQSknNkJVrCS/J1OTpOPRTi4VKATT3tHJAfDbWZTwcSelUCLQ4lREiuT3D
+WP4vKrLIxDJDb+/mwuV7WWo34E6MD9iTB1xINg==
+-----END NEW CERTIFICATE REQUEST-----
diff --git a/tests/test_pkcs10/test_pkcs10.py b/tests/test_pkcs10/test_pkcs10.py
index 66d205b96..4c8ba1366 100644
--- a/tests/test_pkcs10/test_pkcs10.py
+++ b/tests/test_pkcs10/test_pkcs10.py
@@ -26,6 +26,8 @@ import nose
from tests.util import raises, PluginTester
from ipalib import pkcs10
from ipapython import ipautil
+import nss.nss as nss
+from nss.error import NSPRError
class test_update(object):
"""
@@ -33,6 +35,7 @@ class test_update(object):
"""
def setUp(self):
+ nss.nss_init_nodb()
if ipautil.file_exists("test0.csr"):
self.testdir="./"
elif ipautil.file_exists("tests/test_pkcs10/test0.csr"):
@@ -53,15 +56,11 @@ class test_update(object):
csr = self.read_file("test0.csr")
request = pkcs10.load_certificate_request(csr)
- attributes = request.get_attributes()
- subject = request.get_subject()
- components = subject.get_components()
- compdict = dict(components)
+ subject = pkcs10.get_subject(request)
- assert(attributes == ())
- assert(compdict['CN'] == u'test.example.com')
- assert(compdict['ST'] == u'California')
- assert(compdict['C'] == u'US')
+ assert(subject.common_name == 'test.example.com')
+ assert(subject.state_name == 'California')
+ assert(subject.country_name == 'US')
def test_1(self):
"""
@@ -70,23 +69,15 @@ class test_update(object):
csr = self.read_file("test1.csr")
request = pkcs10.load_certificate_request(csr)
- attributes = request.get_attributes()
- subject = request.get_subject()
- components = subject.get_components()
- compdict = dict(components)
- attrdict = dict(attributes)
+ subject = pkcs10.get_subject(request)
- assert(compdict['CN'] == u'test.example.com')
- assert(compdict['ST'] == u'California')
- assert(compdict['C'] == u'US')
+ assert(subject.common_name == 'test.example.com')
+ assert(subject.state_name == 'California')
+ assert(subject.country_name == 'US')
- extensions = attrdict['1.2.840.113549.1.9.14']
-
- for ext in range(len(extensions)):
- if extensions[ext][0] == '2.5.29.17':
- names = extensions[ext][2]
- # check the dNSName field
- assert(names[2] == [u'testlow.example.com'])
+ for extension in request.extensions:
+ if extension.oid_tag == nss.SEC_OID_X509_SUBJECT_ALT_NAME:
+ assert nss.x509_alt_name(extension.value)[0] == 'testlow.example.com'
def test_2(self):
"""
@@ -95,25 +86,39 @@ class test_update(object):
csr = self.read_file("test2.csr")
request = pkcs10.load_certificate_request(csr)
- attributes = request.get_attributes()
- subject = request.get_subject()
- components = subject.get_components()
- compdict = dict(components)
- attrdict = dict(attributes)
-
- assert(compdict['CN'] == u'test.example.com')
- assert(compdict['ST'] == u'California')
- assert(compdict['C'] == u'US')
-
- extensions = attrdict['1.2.840.113549.1.9.14']
-
- for ext in range(len(extensions)):
- if extensions[ext][0] == '2.5.29.17':
- names = extensions[ext][2]
- # check the dNSName field
- assert(names[2] == [u'testlow.example.com'])
- if extensions[ext][0] == '2.5.29.31':
- urls = extensions[ext][2]
- assert(len(urls) == 2)
- assert(urls[0] == u'http://ca.example.com/my.crl')
- assert(urls[1] == u'http://other.example.com/my.crl')
+ subject = pkcs10.get_subject(request)
+
+ assert(subject.common_name == 'test.example.com')
+ assert(subject.state_name == 'California')
+ assert(subject.country_name == 'US')
+
+ for extension in request.extensions:
+ if extension.oid_tag == nss.SEC_OID_X509_SUBJECT_ALT_NAME:
+ assert nss.x509_alt_name(extension.value)[0] == 'testlow.example.com'
+ if extension.oid_tag == nss.SEC_OID_X509_CRL_DIST_POINTS:
+ pts = nss.CRLDistributionPts(extension.value)
+ urls = pts[0].get_general_names()
+ assert('http://ca.example.com/my.crl' in urls)
+ assert('http://other.example.com/my.crl' in urls)
+
+ def test_3(self):
+ """
+ Test CSR with base64-encoded bogus data
+ """
+ csr = self.read_file("test3.csr")
+
+ try:
+ request = pkcs10.load_certificate_request(csr)
+ except NSPRError, nsprerr:
+ # (SEC_ERROR_BAD_DER) security library: improperly formatted DER-encoded message.
+ assert(nsprerr. errno== -8183)
+
+ def test_4(self):
+ """
+ Test CSR with badly formatted base64-encoded data
+ """
+ csr = self.read_file("test4.csr")
+ try:
+ request = pkcs10.load_certificate_request(csr)
+ except TypeError, typeerr:
+ assert(str(typeerr) == 'Incorrect padding')