diff options
-rw-r--r-- | ipa.spec.in | 5 | ||||
-rw-r--r-- | ipalib/pkcs10.py | 426 | ||||
-rw-r--r-- | ipalib/plugins/cert.py | 40 | ||||
-rw-r--r-- | ipapython/nsslib.py | 4 | ||||
-rw-r--r-- | ipaserver/install/dsinstance.py | 2 | ||||
-rw-r--r-- | ipaserver/plugins/selfsign.py | 28 | ||||
-rw-r--r-- | tests/test_ipalib/test_x509.py | 12 | ||||
-rw-r--r-- | tests/test_pkcs10/test3.csr | 3 | ||||
-rw-r--r-- | tests/test_pkcs10/test4.csr | 4 | ||||
-rw-r--r-- | tests/test_pkcs10/test5.csr | 20 | ||||
-rw-r--r-- | tests/test_pkcs10/test_pkcs10.py | 95 |
11 files changed, 158 insertions, 481 deletions
diff --git a/ipa.spec.in b/ipa.spec.in index 8bca7337..0ccf7018 100644 --- a/ipa.spec.in +++ b/ipa.spec.in @@ -176,7 +176,7 @@ Requires: python-kerberos >= 1.1-3 Requires: authconfig Requires: gnupg Requires: pyOpenSSL -Requires: python-nss >= 0.9 +Requires: python-nss >= 0.9-8 Requires: python-lxml %description python @@ -494,6 +494,9 @@ fi %endif %changelog +* Mon Jul 19 2010 Rob Crittenden <rcritten@redhat.com> - 1.99-25 +- Bump up minimum version of python-nss to pick up nss_is_initialize() API + * Thu Jun 24 2010 Adam Young <ayoung@redhat.com> - 1.99-24 - Removed python-asset based webui diff --git a/ipalib/pkcs10.py b/ipalib/pkcs10.py index 9119d12e..3011c1f7 100644 --- a/ipalib/pkcs10.py +++ b/ipalib/pkcs10.py @@ -1,7 +1,7 @@ # Authors: # Rob Crittenden <rcritten@redhat.com> # -# Copyright (C) 2009 Red Hat +# Copyright (C) 2010 Red Hat # see file 'COPYING' for use and warranty information # # This program is free software; you can redistribute it and/or @@ -17,356 +17,34 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# Read PKCS#10 certificate requests (see RFC 2986 and 5280) +import os +import sys +import base64 +import nss.nss as nss +from ipapython import ipautil +from ipalib import api -# NOTE: Not every extension is currently handled. Known to now work: -# 2.5.29.37 - extKeyUsage +PEM = 0 +DER = 1 -import sys, string, base64 -from pyasn1.type import base,tag,namedtype,namedval,univ,constraint,char,useful -from pyasn1.codec.der import decoder, encoder -from pyasn1 import error -import copy - -# Common OIDs found in a subject -oidtable = { "2.5.4.3": "CN", - "2.5.4.6": "C", - "2.5.4.7": "L", - "2.5.4.8": "ST", - "2.5.4.10": "O", - "2.5.4.11": "OU", - "1.2.840.113549.1.9.1": "E", - "0.9.2342.19200300.100.1.25": "DC", - } - -# Some useful OIDs -FRIENDLYNAME = '1.2.840.113549.1.9.20' -EXTENSIONREQUEST = '1.2.840.113549.1.9.14' - -MAX = 32 # from mozilla/security/nss/lib/util/secasn1t.h - -class DirectoryString(univ.Choice): - componentType = namedtype.NamedTypes( - namedtype.NamedType('teletexString', char.TeletexString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))), - namedtype.NamedType('printableString', char.PrintableString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))), - namedtype.NamedType('universalString', char.UniversalString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))), - namedtype.NamedType('utf8String', char.UTF8String().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))), - namedtype.NamedType('bmpString', char.BMPString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))), - ) - -class AttributeValue(DirectoryString): pass - -class AttributeType(univ.ObjectIdentifier): pass - -class AttributeTypeAndValue(univ.Sequence): - componentType = namedtype.NamedTypes( - namedtype.NamedType('type', AttributeType()), - namedtype.NamedType('value', AttributeValue()) # FIXME, could be any type - ) - -class KeyPurposeId(univ.ObjectIdentifier): pass - -class ExtKeyUsageSyntax(univ.SequenceOf): - componentType = KeyPurposeId() - -class UPN(char.UTF8String): - tagSet = char.UTF8String.tagSet.tagExplicitly( - tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 0) - ) - -class AttributeValueSet(univ.SetOf): - componentType = univ.Any() - sizeSpec = univ.SetOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX) - -class Attribute(univ.Sequence): - componentType = namedtype.NamedTypes( - namedtype.NamedType('type', AttributeType()), - namedtype.NamedType('values', AttributeValueSet()), - ) - -class Attributes(univ.SetOf): - componentType = Attribute() - -class RelativeDistinguishedName(univ.SetOf): - componentType = AttributeTypeAndValue() - -class RDNSequence(univ.SequenceOf): - componentType = RelativeDistinguishedName() - -class Name(univ.Choice): - componentType = namedtype.NamedTypes( - namedtype.NamedType('', RDNSequence()) - ) - - def get_components(self): - components = self.getComponentByPosition(0) - complist = [] - for idx in range(len(components)): - attrandvalue = components[idx].getComponentByPosition(0) - oid = attrandvalue.getComponentByPosition(0) - # FIXME, should handle any string type - value = attrandvalue.getComponentByPosition(1).getComponentByType(char.PrintableString.tagSet) - if value is None: - value = attrandvalue.getComponentByPosition(1).getComponentByType(char.UTF8String.tagSet) - if value is None: - value = attrandvalue.getComponentByPosition(1).getComponentByType(char.IA5String.tagSet) - vout = value.prettyOut(value).decode('utf-8') - oidout = oid.prettyOut(oid).decode('utf-8') - c = ((oidtable.get(oidout, oidout), vout)) - complist.append(c) - - return tuple(complist) - -class AnotherName(univ.Sequence): - componentType = namedtype.NamedTypes( - namedtype.NamedType('type-id', univ.ObjectIdentifier()), - namedtype.NamedType('value', univ.Any()) - ) - -class rfc822Name(char.IA5String): - tagSet = char.IA5String.tagSet.tagImplicitly( - tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1) - ) - -class dNSName(char.IA5String): - tagSet = char.IA5String.tagSet.tagImplicitly( - tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2) - ) - -class x400Address(univ.OctetString): - tagSet = univ.OctetString.tagSet.tagImplicitly( - tag.Tag(tag.tagClassUniversal, tag.tagFormatSimple, 3) - ) - -class directoryName(Name): - tagSet = Name.tagSet.tagImplicitly( - tag.Tag(tag.tagClassUniversal, tag.tagFormatSimple, 4) - ) - -class uniformResourceIdentifier(char.IA5String): - tagSet = char.IA5String.tagSet.tagImplicitly( - tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 6) - ) - -# Not all general types are handled, nor are these necessarily done -# per the specification. -class GeneralName(univ.Choice): - componentType = namedtype.NamedTypes( - namedtype.NamedType('otherName', AnotherName().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))), - namedtype.NamedType('rfc822Name', rfc822Name()), #1 - namedtype.NamedType('dNSName', dNSName()), #2 - namedtype.NamedType('x400Address', x400Address()), #3 - namedtype.NamedType('directoryName', directoryName()), #4 - # 5 FIXME - namedtype.NamedType('uniformResourceIdentifier', uniformResourceIdentifier()), #6 -# namedtype.NamedType('uniformResourceIdentifier', char.IA5String(tagSet=char.IA5String.tagSet.tagImplicitly(tag.Tag(tag.tagClassUniversal, tag.tagFormatSimple, 6)))), - ) - -class GeneralNames(univ.SequenceOf): - componentType = GeneralName() - sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX) - -class SubjectAltName(univ.SequenceOf): - componentType = GeneralName() - sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX) - -class DistributionPointName(univ.Choice): - componentType = namedtype.NamedTypes( - namedtype.NamedType('fullName', GeneralNames().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))), - namedtype.NamedType('nameRelativeToCRLIssuer', RelativeDistinguishedName().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))), - ) - -class DistributionPoint(univ.Sequence): - componentType = namedtype.NamedTypes( - namedtype.OptionalNamedType('distributionPoint', DistributionPointName().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))), - namedtype.OptionalNamedType('reasons', univ.BitString().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))), # FIXME - namedtype.OptionalNamedType('cRLIssuer', GeneralNames().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))), - ) - -class cRLDistributionPoints(univ.SequenceOf): - componentType = DistributionPoint() - sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX) - -class basicConstraints(univ.Sequence): - componentType = namedtype.NamedTypes( - namedtype.DefaultedNamedType('cA', univ.Boolean('False')), - namedtype.OptionalNamedType('pathLenConstraint', univ.Integer()), - ) - -class AlgorithmIdentifier(univ.Sequence): - componentType = namedtype.NamedTypes( - namedtype.NamedType('algorithm', univ.ObjectIdentifier()), - namedtype.OptionalNamedType('parameters', univ.Any()) - ) - -class SubjectPublicKeyInfo(univ.Sequence): - componentType = namedtype.NamedTypes( - namedtype.NamedType('algorithm', AlgorithmIdentifier()), - namedtype.NamedType('subjectPublicKey', univ.BitString()) - ) - -class Version(univ.Integer): pass - -class CertificationRequestInfo(univ.Sequence): - componentType = namedtype.NamedTypes( - namedtype.NamedType('version', Version()), - namedtype.NamedType('subject', Name()), - namedtype.NamedType('subjectPublicKeyInfo', SubjectPublicKeyInfo()), - namedtype.OptionalNamedType('attributes', Attributes().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0))) - ) - -class CertificationRequest(univ.Sequence): - componentType = namedtype.NamedTypes( - namedtype.NamedType('certificationRequestInfo', CertificationRequestInfo()), - namedtype.NamedType('signatureAlgorithm', AlgorithmIdentifier()), - namedtype.NamedType('signatureValue', univ.BitString()) - ) - - def get_version(self): - info = self.getComponentByName('certificationRequestInfo') - version = info.getComponentByName('version') - return version._value - - def get_subject(self): - info = self.getComponentByName('certificationRequestInfo') - return info.getComponentByName('subject') - - def get_subjectaltname(self): - attrs = self.get_attributes() - attrdict = dict(attrs) - if EXTENSIONREQUEST in attrdict: - # Extensions are a 3 position tuple - for ext in attrdict[EXTENSIONREQUEST]: - if ext[0] == '2.5.29.17': - # alt name is in the dNSName position - return ext[2][2] - - def get_attributes(self): - info = self.getComponentByName('certificationRequestInfo') - attrs = info.getComponentByName('attributes') - attributes = [] - - for idx in range(len(attrs)): - atype = attrs[idx].getComponentByPosition(0) - aval = attrs[idx].getComponentByPosition(1) - - # The attribute list is of type Any, need to re-encode - aenc = encoder.encode(aval, maxChunkSize=1024) - decoded = decoder.decode(aenc)[0] - oid = atype.prettyOut(atype) - - if oid == "1.2.840.113549.1.9.20": # PKCS#9 Friendly Name - value = decoded.getComponentByPosition(0) - t = (oid, value.prettyOut(value).decode('utf-8')) - attributes.append(t) - elif oid == "1.2.840.113549.1.9.14": # PKCS#9 Extension Req - extensions = [] - extlist = decoded.getComponentByPosition(0) - for jdx in range(len(extlist)): - ext = extlist.getComponentByPosition(jdx) - # An extension has 3 elements: - # oid - # bool - critical - # value - if len(ext) == 2: # If no critical, default to False - extoid = atype.prettyOut(ext.getComponentByPosition(0)) - critical = False - extvalue = ext.getComponentByPosition(1) - else: - extoid = atype.prettyOut(ext.getComponentByPosition(0)) - critical = bool(ext.getComponentByPosition(1)._value) - extvalue = ext.getComponentByPosition(2) - - if extoid == '2.5.29.19': # basicConstraints - extdecoded = decoder.decode(extvalue._value, asn1Spec=basicConstraints())[0] - ca = bool(extdecoded[0]) - if len(extdecoded) == 2: # path length is optional - pathlen = extdecoded[1]._value - else: - pathlen = None - constraint = (ca, pathlen) - e = (extoid, critical, constraint) - extensions.append(e) - continue - elif extoid == '2.5.29.31': # cRLDistributionPoints - extdecoded = decoder.decode(extvalue._value, asn1Spec=cRLDistributionPoints())[0] - distpoints = [] - for elem in range(len(extdecoded)): - name = extdecoded[elem] - # DistributionPoint is position 0 - distpoint = name.getComponentByPosition(0) - # fullName is position 0 - fullname = distpoint.getComponentByPosition(0) - for crl in range(len(fullname)): - # Get the GeneralName, URI type - uri = fullname.getComponentByPosition(crl).getComponentByPosition(5) - distpoints.append(uri.prettyOut(uri).decode('utf-8')) - e = (extoid, critical, tuple(distpoints)) - extensions.append(e) - continue - - # The data is is encoded as "Any". Pull the raw data out - # and re-decode it using a different specification. - try: - extdecoded = decoder.decode(extvalue._value, asn1Spec=GeneralNames())[0] - except error.PyAsn1Error: - # I've seen CSRs where this isn't a sequence of names - # but is a single name, try to handle that too. - try: - extdecoded = decoder.decode(extvalue._value, asn1Spec=GeneralName())[0] - extdecoded = [extdecoded] - except error.PyAsn1Error, e: - # skip for now - generalnames = 9*["Error"] - e = (extoid, critical, tuple(generalnames)) - extensions.append(e) - continue - - # We now have a list of extensions in the order they - # are in the request as GeneralNames. We iterate through - # each of those to get a GeneralName. We then have to - # iterate through that to find the position set in it. - - # Note that not every type will be returned. Those that - # are handled are returned in a tuple in the position - # which they are in the request. - generalnames = 9*[None] - for elem in range(len(extdecoded)): - name = extdecoded[elem] - for n in range(len(name)): - if name[n] is None: - continue - if generalnames[n] is None: - generalnames[n] = [] - if n == 3: # OctetString - generalnames[n].append(name[n]._value) - if n in [1, 2, 6]: # IA5String - if n == 6 and extoid == "2.5.29.37": - # Extended key usage - v = copy.deepcopy(extvalue._value) - othername = decoder.decode(v, asn1Spec=ExtKeyUsageSyntax())[0] - keyusage = [] - for l in range(len(othername)): - keyusage.append(othername[l].prettyOut(othername[l])) +def get_subjectaltname(request): + """ + Given a CSR return the subjectaltname value, if any. - generalnames[n] = tuple(keyusage) - else: - generalnames[n].append(name[n].prettyOut(name[n]).decode('utf-8')) - if n == 0: # AnotherName - nameoid = name[n].getComponentByPosition(0) - nameoid = nameoid.prettyOut(nameoid) - val = name[n].getComponentByPosition(1) - if nameoid == "1.3.6.1.4.1.311.20.2.3": # UPN - v = copy.deepcopy(val._value) - othername = decoder.decode(v, asn1Spec=UPN())[0] - generalnames[0].append(othername.prettyOut(othername).decode('utf-8')) + The return value is a tuple of strings or None + """ + for extension in request.extensions: + if extension.oid_tag == nss.SEC_OID_X509_SUBJECT_ALT_NAME: + return nss.x509_alt_name(extension.value) + return None - e = (extoid, critical, tuple(generalnames)) - extensions.append(e) - t = (oid, tuple(extensions)) - attributes.append(t) +def get_subject(request): + """ + Given a CSR return the subject value. - return tuple(attributes) + This returns an nss.DN object. + """ + return request.subject def strip_header(csr): """ @@ -392,50 +70,26 @@ def load_certificate_request(csr): substrate = base64.b64decode(csr) - return decoder.decode(substrate, asn1Spec=CertificationRequest())[0] + # A fail-safe so we can always read a CSR. python-nss/NSS will segfault + # otherwise + if not nss.nss_is_initialized(): + nss.nss_init_nodb() -if __name__ == '__main__': - # Read PEM certs from stdin and print them out in plain text - - stSpam, stHam, stDump = 0, 1, 2 - state = stSpam + return nss.CertificateRequest(substrate) - for certLine in sys.stdin.readlines(): - certLine = string.strip(certLine) - if state == stSpam: - if state == stSpam: - if certLine == '-----BEGIN NEW CERTIFICATE REQUEST-----': - certLines = [] - state = stHam - continue - if state == stHam: - if certLine == '-----END NEW CERTIFICATE REQUEST-----': - state = stDump - else: - certLines.append(certLine) - complist = [] - if state == stDump: - substrate = '' - for certLine in certLines: - substrate = substrate + base64.b64decode(certLine) +if __name__ == '__main__': + nss.nss_init_nodb() - request = decoder.decode(substrate, asn1Spec=CertificationRequest())[0] - subject = request.get_subject() - attrs = request.get_attributes() - print "Attributes:" - print attrs + # Read PEM request from stdin and print out its components - print "Subject:" - complist = subject.get_components() - print complist - out="" - for c in complist: - out = out + "%s=%s," % (c[0], c[1]) - print out[:-1] + csrlines = sys.stdin.readlines() + csrlines = fp.readlines() + fp.close() + csr = ''.join(csrlines) - print request.get_subjectaltname() + csr = load_certificate_request(csr) - # Re-encode the request just to be sure things are working - assert encoder.encode(request, maxChunkSize=1024) == substrate, 'cert recode fails' + print csr - state = stSpam + print get_subject(csr) + print get_subjectaltname(csr) diff --git a/ipalib/plugins/cert.py b/ipalib/plugins/cert.py index 1de4ac64..ed1d65ad 100644 --- a/ipalib/plugins/cert.py +++ b/ipalib/plugins/cert.py @@ -69,7 +69,6 @@ from ipalib import x509 from ipalib.plugins.virtual import * from ipalib.plugins.service import split_principal import base64 -from pyasn1.error import PyAsn1Error import logging import traceback from ipalib.text import _ @@ -77,37 +76,31 @@ from ipalib.request import context from ipalib.output import Output from ipalib.plugins.service import validate_principal import nss.nss as nss +from nss.error import NSPRError def get_csr_hostname(csr): """ - Return the value of CN in the subject of the request + Return the value of CN in the subject of the request or None """ try: request = pkcs10.load_certificate_request(csr) - sub = request.get_subject().get_components() - for s in sub: - if s[0].lower() == "cn": - return s[1] - except PyAsn1Error: - # The ASN.1 decoding errors tend to be long and involved and the - # last bit is generally not interesting. We need the whole traceback. - logging.error('Unable to decode CSR\n%s', traceback.format_exc()) - raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request')) - - return None + subject = pkcs10.get_subject(request) + return subject.common_name + except NSPRError, nsprerr: + raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request:')) def get_subjectaltname(csr): """ - Return the value of the subject alt name, if any + Return the first value of the subject alt name, if any """ try: request = pkcs10.load_certificate_request(csr) - except PyAsn1Error: - # The ASN.1 decoding errors tend to be long and involved and the - # last bit is generally not interesting. We need the whole traceback. - logging.error('Unable to decode CSR\n%s', traceback.format_exc()) + for extension in request.extensions: + if extension.oid_tag == nss.SEC_OID_X509_SUBJECT_ALT_NAME: + return nss.x509_alt_name(extension.value)[0] + return None + except NSPRError, nsprerr: raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request')) - return request.get_subjectaltname() def validate_csr(ugettext, csr): """ @@ -116,13 +109,9 @@ def validate_csr(ugettext, csr): """ try: request = pkcs10.load_certificate_request(csr) - - # Explicitly request the attributes. This fires off additional - # decoding to get things like the subjectAltName. - attrs = request.get_attributes() except TypeError, e: raise errors.Base64DecodeError(reason=str(e)) - except PyAsn1Error: + except NSPRError: raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request')) except Exception, e: raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request: %s') % str(e)) @@ -290,7 +279,8 @@ class cert_request(VirtualCommand): raise errors.ACIError(info="Insufficient 'write' privilege to the 'userCertificate' attribute of entry '%s'." % dn) # Validate the subject alt name, if any - subjectaltname = get_subjectaltname(csr) + request = pkcs10.load_certificate_request(csr) + subjectaltname = pkcs10.get_subjectaltname(request) if subjectaltname is not None: for name in subjectaltname: try: diff --git a/ipapython/nsslib.py b/ipapython/nsslib.py index 02bff00a..7e249b3b 100644 --- a/ipapython/nsslib.py +++ b/ipapython/nsslib.py @@ -122,6 +122,10 @@ class NSSConnection(httplib.HTTPConnection): raise RuntimeError("dbdir is required") logging.debug('%s init %s', self.__class__.__name__, host) + if nss.nss_is_initialized(): + # close any open NSS database and use the new one + ssl.clear_session_cache() + nss.nss_shutdown() nss.nss_init(dbdir) ssl.set_domestic_policy() nss.set_password_callback(self.password_callback) diff --git a/ipaserver/install/dsinstance.py b/ipaserver/install/dsinstance.py index e1ddef39..3ea9c94c 100644 --- a/ipaserver/install/dsinstance.py +++ b/ipaserver/install/dsinstance.py @@ -366,7 +366,7 @@ class DsInstance(service.Service): self._ldap_mod("ipa-winsync-conf.ldif") def __config_version_module(self): - self._ldap_mod("ipa-version-conf.ldif") + self._ldap_mod("version-conf.ldif") def __user_private_groups(self): if has_managed_entries(self.host_name, self.dm_password): diff --git a/ipaserver/plugins/selfsign.py b/ipaserver/plugins/selfsign.py index 39d1c539..5333a89a 100644 --- a/ipaserver/plugins/selfsign.py +++ b/ipaserver/plugins/selfsign.py @@ -45,10 +45,9 @@ import re from ipaserver.plugins import rabase from ipaserver.install import certs import tempfile -from pyasn1 import error from ipalib import _ -from pyasn1.codec.der import encoder from ipalib.plugins.cert import get_csr_hostname +from nss.error import NSPRError class ra(rabase.rabase): """ @@ -87,23 +86,19 @@ class ra(rabase.rabase): config = api.Command['config_show']()['result'] subject_base = config.get('ipacertificatesubjectbase')[0] hostname = get_csr_hostname(csr) - request = pkcs10.load_certificate_request(csr) base = re.split(',\s*(?=\w+=)', subject_base) - base.reverse() - base.append("CN=%s" % hostname) - request_subject = request.get_subject().get_components() - new_request = [] - for r in request_subject: - new_request.append("%s=%s" % (r[0], r[1])) - - if str(base).lower() != str(new_request).lower(): - subject_base='CN=%s, %s' % (hostname, subject_base) - new_request.reverse() + base.insert(0,'CN=%s' % hostname) + subject_base = ",".join(base) + request = pkcs10.load_certificate_request(csr) + # python-nss normalizes the request subject + request_subject = str(pkcs10.get_subject(request)) + + if str(subject_base).lower() != request_subject.lower(): raise errors.CertificateOperationError(error=_('Request subject "%(request_subject)s" does not match the form "%(subject_base)s"') % \ - {'request_subject' : ', '.join(new_request), 'subject_base' : subject_base}) + {'request_subject' : request_subject, 'subject_base' : subject_base}) except errors.CertificateOperationError, e: raise e - except Exception, e: + except NSPRError, e: raise errors.CertificateOperationError(error=_('unable to decode csr: %s' % e)) # certutil wants the CSR to have have a header and footer. Add one @@ -207,11 +202,10 @@ class ra(rabase.rabase): pass try: - # Grab the subject, reverse it, combine it and return it subject = x509.get_subject(cert) serial = x509.get_serial_number(cert) - except error.PyAsn1Error, e: + except NSPRError, e: self.log.error('Unable to decode certificate in entry: %s' % str(e)) raise errors.CertificateOperationError(error='Unable to decode certificate in entry: %s' % str(e)) diff --git a/tests/test_ipalib/test_x509.py b/tests/test_ipalib/test_x509.py index 50e827ca..ca21e28c 100644 --- a/tests/test_ipalib/test_x509.py +++ b/tests/test_ipalib/test_x509.py @@ -92,18 +92,18 @@ class test_x509(object): Test retrieving the subject """ subject = x509.get_subject(goodcert) - assert subject == 'CN=ipa.example.com,O=IPA' + assert str(subject) == 'CN=ipa.example.com,O=IPA' der = base64.b64decode(goodcert) subject = x509.get_subject(der, x509.DER) - assert subject == 'CN=ipa.example.com,O=IPA' + assert str(subject) == 'CN=ipa.example.com,O=IPA' # We should be able to pass in a tuple/list of certs too subject = x509.get_subject((goodcert)) - assert subject == 'CN=ipa.example.com,O=IPA' + assert str(subject) == 'CN=ipa.example.com,O=IPA' subject = x509.get_subject([goodcert]) - assert subject == 'CN=ipa.example.com,O=IPA' + assert str(subject) == 'CN=ipa.example.com,O=IPA' def test_2_get_serial_number(self): """ @@ -132,8 +132,8 @@ class test_x509(object): cert = x509.load_certificate(goodcert) - assert cert.subject == 'CN=ipa.example.com,O=IPA' - assert cert.issuer == 'CN=IPA Test Certificate Authority' + assert str(cert.subject) == 'CN=ipa.example.com,O=IPA' + assert str(cert.issuer) == 'CN=IPA Test Certificate Authority' assert cert.serial_number == 1093 assert cert.valid_not_before_str == 'Fri Jun 25 13:00:42 2010 UTC' assert cert.valid_not_after_str == 'Thu Jun 25 13:00:42 2015 UTC' diff --git a/tests/test_pkcs10/test3.csr b/tests/test_pkcs10/test3.csr new file mode 100644 index 00000000..82c84d15 --- /dev/null +++ b/tests/test_pkcs10/test3.csr @@ -0,0 +1,3 @@ +-----BEGIN NEW CERTIFICATE REQUEST----- +VGhpcyBpcyBhbiBpbnZhbGlkIENTUg== +-----END NEW CERTIFICATE REQUEST----- diff --git a/tests/test_pkcs10/test4.csr b/tests/test_pkcs10/test4.csr new file mode 100644 index 00000000..9f08b802 --- /dev/null +++ b/tests/test_pkcs10/test4.csr @@ -0,0 +1,4 @@ +-----BEGIN NEW CERTIFICATE REQUEST----- +Invalidate data +-----END NEW CERTIFICATE REQUEST----- + diff --git a/tests/test_pkcs10/test5.csr b/tests/test_pkcs10/test5.csr new file mode 100644 index 00000000..41c3c1f3 --- /dev/null +++ b/tests/test_pkcs10/test5.csr @@ -0,0 +1,20 @@ + +Certificate request generated by Netscape certutil +Phone: (not specified) + +Common Name: test.example.com +Email: (not specified) +Organization: IPA +State: (not specified) +Country: (not specified) + +-----BEGIN NEW CERTIFICATE REQUEST----- +MIIBaDCB0gIBADApMQwwCgYDVQQKEwNJUEExGTAXBgNVBAMTEHRlc3QuZXhhbXBs
+ZS5jb20wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAPnSCLwl7IytP2HC7+zv
+nI2fe6oRCE/J8K1jIoiqS9engx3Yfe4kaXWWzcwmuUV57VhUmWDEQIbSREPdrVSi
+tWC55ilGmPOAEw+mP4qg6Ctb+d8Egmy1JVrpIYCLNXvEd3dAaimB0J+K3hKFRyHI
+2MzrIuFqqohRijkDLwB8oVVdAgMBAAGgADANBgkqhkiG9w0BAQUFAAOBgQACt37K
+j+RMEbqG8s0Uxs3FhcfiAx8Do99CDizY/b7hZEgMyG4dLmm+vSCBbxBrG5oMlxJD
+dxnpk0PQSknNkJVrCS/J1OTpOPRTi4VKATT3tHJAfDbWZTwcSelUCLQ4lREiuT3D
+WP4vKrLIxDJDb+/mwuV7WWo34E6MD9iTB1xINg== +-----END NEW CERTIFICATE REQUEST----- diff --git a/tests/test_pkcs10/test_pkcs10.py b/tests/test_pkcs10/test_pkcs10.py index 66d205b9..4c8ba136 100644 --- a/tests/test_pkcs10/test_pkcs10.py +++ b/tests/test_pkcs10/test_pkcs10.py @@ -26,6 +26,8 @@ import nose from tests.util import raises, PluginTester from ipalib import pkcs10 from ipapython import ipautil +import nss.nss as nss +from nss.error import NSPRError class test_update(object): """ @@ -33,6 +35,7 @@ class test_update(object): """ def setUp(self): + nss.nss_init_nodb() if ipautil.file_exists("test0.csr"): self.testdir="./" elif ipautil.file_exists("tests/test_pkcs10/test0.csr"): @@ -53,15 +56,11 @@ class test_update(object): csr = self.read_file("test0.csr") request = pkcs10.load_certificate_request(csr) - attributes = request.get_attributes() - subject = request.get_subject() - components = subject.get_components() - compdict = dict(components) + subject = pkcs10.get_subject(request) - assert(attributes == ()) - assert(compdict['CN'] == u'test.example.com') - assert(compdict['ST'] == u'California') - assert(compdict['C'] == u'US') + assert(subject.common_name == 'test.example.com') + assert(subject.state_name == 'California') + assert(subject.country_name == 'US') def test_1(self): """ @@ -70,23 +69,15 @@ class test_update(object): csr = self.read_file("test1.csr") request = pkcs10.load_certificate_request(csr) - attributes = request.get_attributes() - subject = request.get_subject() - components = subject.get_components() - compdict = dict(components) - attrdict = dict(attributes) + subject = pkcs10.get_subject(request) - assert(compdict['CN'] == u'test.example.com') - assert(compdict['ST'] == u'California') - assert(compdict['C'] == u'US') + assert(subject.common_name == 'test.example.com') + assert(subject.state_name == 'California') + assert(subject.country_name == 'US') - extensions = attrdict['1.2.840.113549.1.9.14'] - - for ext in range(len(extensions)): - if extensions[ext][0] == '2.5.29.17': - names = extensions[ext][2] - # check the dNSName field - assert(names[2] == [u'testlow.example.com']) + for extension in request.extensions: + if extension.oid_tag == nss.SEC_OID_X509_SUBJECT_ALT_NAME: + assert nss.x509_alt_name(extension.value)[0] == 'testlow.example.com' def test_2(self): """ @@ -95,25 +86,39 @@ class test_update(object): csr = self.read_file("test2.csr") request = pkcs10.load_certificate_request(csr) - attributes = request.get_attributes() - subject = request.get_subject() - components = subject.get_components() - compdict = dict(components) - attrdict = dict(attributes) - - assert(compdict['CN'] == u'test.example.com') - assert(compdict['ST'] == u'California') - assert(compdict['C'] == u'US') - - extensions = attrdict['1.2.840.113549.1.9.14'] - - for ext in range(len(extensions)): - if extensions[ext][0] == '2.5.29.17': - names = extensions[ext][2] - # check the dNSName field - assert(names[2] == [u'testlow.example.com']) - if extensions[ext][0] == '2.5.29.31': - urls = extensions[ext][2] - assert(len(urls) == 2) - assert(urls[0] == u'http://ca.example.com/my.crl') - assert(urls[1] == u'http://other.example.com/my.crl') + subject = pkcs10.get_subject(request) + + assert(subject.common_name == 'test.example.com') + assert(subject.state_name == 'California') + assert(subject.country_name == 'US') + + for extension in request.extensions: + if extension.oid_tag == nss.SEC_OID_X509_SUBJECT_ALT_NAME: + assert nss.x509_alt_name(extension.value)[0] == 'testlow.example.com' + if extension.oid_tag == nss.SEC_OID_X509_CRL_DIST_POINTS: + pts = nss.CRLDistributionPts(extension.value) + urls = pts[0].get_general_names() + assert('http://ca.example.com/my.crl' in urls) + assert('http://other.example.com/my.crl' in urls) + + def test_3(self): + """ + Test CSR with base64-encoded bogus data + """ + csr = self.read_file("test3.csr") + + try: + request = pkcs10.load_certificate_request(csr) + except NSPRError, nsprerr: + # (SEC_ERROR_BAD_DER) security library: improperly formatted DER-encoded message. + assert(nsprerr. errno== -8183) + + def test_4(self): + """ + Test CSR with badly formatted base64-encoded data + """ + csr = self.read_file("test4.csr") + try: + request = pkcs10.load_certificate_request(csr) + except TypeError, typeerr: + assert(str(typeerr) == 'Incorrect padding') |