summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRob Crittenden <rcritten@redhat.com>2010-06-24 11:40:02 -0400
committerRob Crittenden <rcritten@redhat.com>2010-07-15 10:51:49 -0400
commit8d2d7429beb6bf66cb3c4fc35a7a3dbb165a432c (patch)
treec364bfb5b5926a165f1e6bc29e355131636afe45
parent1e1985b17c3988056bef045fa84a9c7aaf0c4c65 (diff)
downloadfreeipa-8d2d7429beb6bf66cb3c4fc35a7a3dbb165a432c.tar.gz
freeipa-8d2d7429beb6bf66cb3c4fc35a7a3dbb165a432c.tar.xz
freeipa-8d2d7429beb6bf66cb3c4fc35a7a3dbb165a432c.zip
Clean up crypto code, take advantage of new nss-python capabilities
This patch does the following: - drops our in-tree x509v3 parser to use the python-nss one - return more information on certificates - make an API change, renaming cert-get to cert-show - Drop a lot of duplicated code
-rw-r--r--ipalib/plugins/cert.py165
-rw-r--r--ipalib/plugins/host.py6
-rw-r--r--ipalib/plugins/service.py25
-rw-r--r--ipalib/x509.py288
-rw-r--r--ipapython/nsslib.py1
5 files changed, 147 insertions, 338 deletions
diff --git a/ipalib/plugins/cert.py b/ipalib/plugins/cert.py
index 17e4c46b0..1de4ac64e 100644
--- a/ipalib/plugins/cert.py
+++ b/ipalib/plugins/cert.py
@@ -44,7 +44,7 @@ EXAMPLES:
ipa cert-request --add --principal=HTTP/lion.example.com example.csr
Retrieve an existing certificate:
- ipa cert-request 1032
+ ipa cert-show 1032
Revoke a certificate (see RFC 5280 for reason details):
ipa cert-revoke --revocation-reason=6 1032
@@ -75,53 +75,8 @@ import traceback
from ipalib.text import _
from ipalib.request import context
from ipalib.output import Output
-
-def get_serial(certificate):
- """
- Given a certificate, return the serial number in that cert
- as a Python long object.
-
- In theory there should be only one cert per object so even if we get
- passed in a list/tuple only return the first one.
- """
- if type(certificate) in (list, tuple):
- certificate = certificate[0]
- try:
- certificate = base64.b64decode(certificate)
- except Exception, e:
- pass
- try:
-
- serial = x509.get_serial_number(certificate, x509.DER)
- except PyAsn1Error:
- raise errors.CertificateOperationError(error=_('Unable to decode certificate in entry'))
-
- return serial
-
-def get_subject(certificate):
- """
- Given a certificate, return the subject
-
- In theory there should be only one cert per object so even if we get
- passed in a list/tuple only return the first one.
- """
- if type(certificate) in (list, tuple):
- certificate = certificate[0]
- try:
- certificate = base64.b64decode(certificate)
- except Exception, e:
- pass
- try:
- sub = list(x509.get_subject_components(certificate, type=x509.DER))
- sub.reverse()
- except PyAsn1Error:
- raise errors.CertificateOperationError(error=_('Unable to decode certificate in entry'))
-
- subject = ""
- for s in sub:
- subject = subject + "%s=%s," % (s[0], s[1])
-
- return subject[:-1]
+from ipalib.plugins.service import validate_principal
+import nss.nss as nss
def get_csr_hostname(csr):
"""
@@ -192,6 +147,20 @@ def normalize_csr(csr):
return csr
+def get_host_from_principal(principal):
+ """
+ Given a principal with or without a realm return the
+ host portion.
+ """
+ validate_principal(None, principal)
+ realm = principal.find('@')
+ slash = principal.find('/')
+ if realm == -1:
+ realm = len(principal)
+ hostname = principal[slash+1:realm]
+
+ return hostname
+
class cert_request(VirtualCommand):
"""
Submit a certificate signing request.
@@ -219,6 +188,9 @@ class cert_request(VirtualCommand):
default=False,
autofill=True
),
+ )
+
+ has_output_params = (
Str('certificate?',
label=_('Certificate'),
flags=['no_create', 'no_update', 'no_search'],
@@ -227,6 +199,26 @@ class cert_request(VirtualCommand):
label=_('Subject'),
flags=['no_create', 'no_update', 'no_search'],
),
+ Str('issuer?',
+ label=_('Issuer'),
+ flags=['no_create', 'no_update', 'no_search'],
+ ),
+ Str('valid_not_before?',
+ label=_('Not Before'),
+ flags=['no_create', 'no_update', 'no_search'],
+ ),
+ Str('valid_not_after?',
+ label=_('Not After'),
+ flags=['no_create', 'no_update', 'no_search'],
+ ),
+ Str('md5_fingerprint?',
+ label=_('Fingerprint (MD5)'),
+ flags=['no_create', 'no_update', 'no_search'],
+ ),
+ Str('sha1_fingerprint?',
+ label=_('Fingerprint (SHA1)'),
+ flags=['no_create', 'no_update', 'no_search'],
+ ),
Str('serial_number?',
label=_('Serial number'),
flags=['no_create', 'no_update', 'no_search'],
@@ -281,11 +273,7 @@ class cert_request(VirtualCommand):
service = api.Command['service_show'](principal, all=True, raw=True)['result']
dn = service['dn']
else:
- realm = principal.find('@')
- if realm == -1:
- realm = len(principal)
- hostname = principal[5:realm]
-
+ hostname = get_host_from_principal(principal)
service = api.Command['host_show'](hostname, all=True, raw=True)['result']
dn = service['dn']
except errors.NotFound, e:
@@ -319,12 +307,12 @@ class cert_request(VirtualCommand):
raise errors.ACIError(info="Insufficient privilege to create a certificate with subject alt name '%s'." % name)
if 'usercertificate' in service:
- serial = get_serial(base64.b64encode(service['usercertificate'][0]))
+ serial = x509.get_serial_number(service['usercertificate'][0], datatype=x509.DER)
# revoke the certificate and remove it from the service
# entry before proceeding. First we retrieve the certificate to
# see if it is already revoked, if not then we revoke it.
try:
- result = api.Command['cert_get'](unicode(serial))['result']
+ result = api.Command['cert_show'](unicode(serial))['result']
if 'revocation_reason' not in result:
try:
api.Command['cert_revoke'](unicode(serial), revocation_reason=4)
@@ -334,10 +322,20 @@ class cert_request(VirtualCommand):
except errors.NotImplementedError:
# some CA's might not implement get
pass
- api.Command['service_mod'](principal, usercertificate=None)
+ if not principal.startswith('host/'):
+ api.Command['service_mod'](principal, usercertificate=None)
+ else:
+ hostname = get_host_from_principal(principal)
+ api.Command['host_mod'](hostname, usercertificate=None)
# Request the certificate
result = self.Backend.ra.request_certificate(csr, **kw)
+ cert = x509.load_certificate(result['certificate'])
+ result['issuer'] = unicode(cert.issuer)
+ result['valid_not_before'] = unicode(cert.valid_not_before_str)
+ result['valid_not_after'] = unicode(cert.valid_not_after_str)
+ result['md5_fingerprint'] = unicode(nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0])
+ result['sha1_fingerprint'] = unicode(nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0])
# Success? Then add it to the service entry.
if 'certificate' in result:
@@ -345,10 +343,7 @@ class cert_request(VirtualCommand):
skw = {"usercertificate": str(result.get('certificate'))}
api.Command['service_mod'](principal, **skw)
else:
- realm = principal.find('@')
- if realm == -1:
- realm = len(principal)
- hostname = principal[5:realm]
+ hostname = get_host_from_principal(principal)
skw = {"usercertificate": str(result.get('certificate'))}
api.Command['host_mod'](hostname, **skw)
@@ -370,10 +365,9 @@ class cert_status(VirtualCommand):
flags=['no_create', 'no_update', 'no_search'],
),
)
- takes_options = (
- Str('cert_request_status?',
+ has_output_params = (
+ Str('cert_request_status',
label=_('Request status'),
- flags=['no_create', 'no_update', 'no_search'],
),
)
operation = "certificate status"
@@ -393,25 +387,37 @@ _serial_number = Str('serial_number',
doc=_('Serial number in decimal or if prefixed with 0x in hexadecimal'),
)
-class cert_get(VirtualCommand):
+class cert_show(VirtualCommand):
"""
Retrieve an existing certificate.
"""
takes_args = _serial_number
- takes_options = (
- Str('certificate?',
+ has_output_params = (
+ Str('certificate',
label=_('Certificate'),
- flags=['no_create', 'no_update', 'no_search'],
),
- Str('subject?',
+ Str('subject',
label=_('Subject'),
- flags=['no_create', 'no_update', 'no_search'],
+ ),
+ Str('issuer',
+ label=_('Issuer'),
+ ),
+ Str('valid_not_before',
+ label=_('Not Before'),
+ ),
+ Str('valid_not_after',
+ label=_('Not After'),
+ ),
+ Str('md5_fingerprint',
+ label=_('Fingerprint (MD5)'),
+ ),
+ Str('sha1_fingerprint',
+ label=_('Fingerprint (SHA1)'),
),
Str('revocation_reason?',
label=_('Revocation reason'),
- flags=['no_create', 'no_update', 'no_search'],
),
)
@@ -420,10 +426,16 @@ class cert_get(VirtualCommand):
def execute(self, serial_number):
self.check_access()
result=self.Backend.ra.get_certificate(serial_number)
- result['subject'] = get_subject(result['certificate'])
+ cert = x509.load_certificate(result['certificate'])
+ result['subject'] = unicode(cert.subject)
+ result['issuer'] = unicode(cert.issuer)
+ result['valid_not_before'] = unicode(cert.valid_not_before_str)
+ result['valid_not_after'] = unicode(cert.valid_not_after_str)
+ result['md5_fingerprint'] = unicode(nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0])
+ result['sha1_fingerprint'] = unicode(nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0])
return dict(result=result)
-api.register(cert_get)
+api.register(cert_show)
class cert_revoke(VirtualCommand):
@@ -433,10 +445,9 @@ class cert_revoke(VirtualCommand):
takes_args = _serial_number
- takes_options = (
- Flag('revoked?',
+ has_output_params = (
+ Flag('revoked',
label=_('Revoked'),
- flags=['no_create', 'no_update', 'no_search'],
),
)
operation = "revoke certificate"
@@ -468,14 +479,12 @@ class cert_remove_hold(VirtualCommand):
takes_args = _serial_number
- takes_options = (
+ has_output_params = (
Flag('unrevoked?',
label=_('Unrevoked'),
- flags=['no_create', 'no_update', 'no_search'],
),
Str('error_string?',
label=_('Error'),
- flags=['no_create', 'no_update', 'no_search'],
),
)
operation = "certificate remove hold"
diff --git a/ipalib/plugins/host.py b/ipalib/plugins/host.py
index b0d7289a8..b42cbbcb7 100644
--- a/ipalib/plugins/host.py
+++ b/ipalib/plugins/host.py
@@ -71,8 +71,8 @@ from ipalib import Str, Flag, Bytes
from ipalib.plugins.baseldap import *
from ipalib.plugins.service import split_principal
from ipalib.plugins.service import validate_certificate
-from ipalib.plugins.service import get_serial
from ipalib import _, ngettext
+from ipalib import x509
import base64
@@ -291,10 +291,10 @@ class host_mod(LDAPUpdate):
if 'usercertificate' in entry_attrs_old:
# FIXME: what to do here? do we revoke the old cert?
fmt = 'entry already has a certificate, serial number: %s' % (
- get_serial(entry_attrs_old['usercertificate'])
+ x509.get_serial_number(entry_attrs_old['usercertificate'][0], x509.DER)
)
raise errors.GenericError(format=fmt)
- # FIXME: should be in normalizer; see service_add
+ # FIXME: decoding should be in normalizer; see service_add
entry_attrs['usercertificate'] = base64.b64decode(cert)
return dn
diff --git a/ipalib/plugins/service.py b/ipalib/plugins/service.py
index 623128bf1..37de3df42 100644
--- a/ipalib/plugins/service.py
+++ b/ipalib/plugins/service.py
@@ -64,26 +64,9 @@ from ipalib import api, errors
from ipalib import Str, Flag, Bytes
from ipalib.plugins.baseldap import *
from ipalib import x509
-from pyasn1.error import PyAsn1Error
from ipalib import _, ngettext
-def get_serial(certificate):
- """
- Given a certificate, return the serial number in that
- cert as a Python long object.
- """
- if type(certificate) in (list, tuple):
- certificate = certificate[0]
-
- try:
- serial = x509.get_serial_number(certificate, type=x509.DER)
- except PyAsn1Error, e:
- raise errors.GenericError(
- format='Unable to decode certificate in entry: %s' % e
- )
- return serial
-
def split_principal(principal):
service = hostname = realm = None
@@ -194,6 +177,7 @@ class service_add(LDAPCreate):
cert = entry_attrs.get('usercertificate')
if cert:
+ cert = cert[0]
# FIXME: should be in a normalizer: need to fix normalizers
# to work on non-unicode data
entry_attrs['usercertificate'] = base64.b64decode(cert)
@@ -229,9 +213,10 @@ class service_del(LDAPDelete):
(dn, entry_attrs) = ldap.get_entry(dn, ['usercertificate'])
cert = entry_attrs.get('usercertificate')
if cert:
- serial = unicode(get_serial(cert))
+ cert = cert[0]
+ serial = unicode(x509.get_serial_number(cert, x509.DER))
try:
- result = api.Command['cert_get'](unicode(serial))['result']
+ result = api.Command['cert_show'](unicode(serial))['result']
if 'revocation_reason' not in result:
try:
api.Command['cert_revoke'](unicode(serial), revocation_reason=4)
@@ -267,7 +252,7 @@ class service_mod(LDAPUpdate):
if 'usercertificate' in entry_attrs_old:
# FIXME: what to do here? do we revoke the old cert?
fmt = 'entry already has a certificate, serial number: %s' % (
- get_serial(entry_attrs_old['usercertificate'])
+ x509.get_serial_number(entry_attrs_old['usercertificate'][0], x509.DER)
)
raise errors.GenericError(format=fmt)
# FIXME: should be in normalizer; see service_add
diff --git a/ipalib/x509.py b/ipalib/x509.py
index 3c38a354e..bb765faad 100644
--- a/ipalib/x509.py
+++ b/ipalib/x509.py
@@ -1,199 +1,31 @@
-"""
-Imported from pyasn1 project:
-
-Copyright (c) 2005-2009 Ilya Etingof <ilya@glas.net>, all rights reserved.
-
-THIS SOFTWARE IS NOT FAULT TOLERANT AND SHOULD NOT BE USED IN ANY SITUATION
-ENDANGERING HUMAN LIFE OR PROPERTY.
-
-Redistribution and use in source and binary forms, with or without
-modification, are permitted provided that the following conditions are met:
-
- * Redistributions of source code must retain the above copyright notice,
- this list of conditions and the following disclaimer.
-
- * Redistributions in binary form must reproduce the above copyright notice,
- this list of conditions and the following disclaimer in the documentation
- and/or other materials provided with the distribution.
-
- * The name of the authors may not be used to endorse or promote products
- derived from this software without specific prior written permission.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS ``AS IS''
-AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR
-ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
-(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
-LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
-ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
-THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-"""
-
-"""
-Enhancements released under IPA GPLv2 only license
-"""
-
-# Read ASN.1/PEM X.509 certificates on stdin, parse each into plain text,
-# then build substrate from it
-import sys, string, base64
-from pyasn1.type import tag,namedtype,namedval,univ,constraint,char,useful
-from pyasn1.codec.der import decoder, encoder
-from pyasn1 import error
-
-# Would be autogenerated from ASN.1 source by a ASN.1 parser
-# X.509 spec (rfc2459)
+# Authors:
+# Rob Crittenden <rcritten@redhat.com>
+#
+# Copyright (C) 2010 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+import sys
+import base64
+import nss.nss as nss
+from ipapython import ipautil
+from ipalib import api
PEM = 0
DER = 1
-# Common OIDs found in a subject
-oidtable = { "2.5.4.3": "CN",
- "2.5.4.6": "C",
- "2.5.4.7": "L",
- "2.5.4.8": "ST",
- "2.5.4.10": "O",
- "2.5.4.11": "OU",
- "1.2.840.113549.1.9.1": "E",
- "0.9.2342.19200300.100.1.25": "DC",
- }
-
-MAX = 64 # XXX ?
-
-class DirectoryString(univ.Choice):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('teletexString', char.TeletexString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
- namedtype.NamedType('printableString', char.PrintableString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
- namedtype.NamedType('universalString', char.UniversalString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
- namedtype.NamedType('utf8String', char.UTF8String().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
- namedtype.NamedType('bmpString', char.BMPString().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))),
- namedtype.NamedType('ia5String', char.IA5String().subtype(subtypeSpec=constraint.ValueSizeConstraint(1, MAX))) # hm, this should not be here!? XXX
- )
-
-class AttributeValue(DirectoryString): pass
-
-class AttributeType(univ.ObjectIdentifier): pass
-
-class AttributeTypeAndValue(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('type', AttributeType()),
- namedtype.NamedType('value', AttributeValue())
- )
-
-class RelativeDistinguishedName(univ.SetOf):
- componentType = AttributeTypeAndValue()
-
-class RDNSequence(univ.SequenceOf):
- componentType = RelativeDistinguishedName()
-
-class Name(univ.Choice):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('', RDNSequence())
- )
-
- def get_components(self):
- components = self.getComponentByPosition(0)
- complist = []
- for idx in range(len(components)):
- attrandvalue = components[idx].getComponentByPosition(0)
- oid = attrandvalue.getComponentByPosition(0)
- # FIXME, should handle any string type
- value = attrandvalue.getComponentByPosition(1).getComponentByType(char.PrintableString.tagSet)
- if value is None:
- value = attrandvalue.getComponentByPosition(1).getComponentByType(char.UTF8String.tagSet)
- if value is None:
- value = attrandvalue.getComponentByPosition(1).getComponentByType(char.IA5String.tagSet)
- vout = value.prettyOut(value).decode('utf-8')
- oidout = oid.prettyOut(oid).decode('utf-8')
- c = ((oidtable.get(oidout, oidout), vout))
- complist.append(c)
-
- return tuple(complist)
-
-class AlgorithmIdentifier(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('algorithm', univ.ObjectIdentifier()),
- namedtype.OptionalNamedType('parameters', univ.Null())
- # XXX syntax screwed?
-# namedtype.OptionalNamedType('parameters', univ.ObjectIdentifier())
- )
-
-class Extension(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('extnID', univ.ObjectIdentifier()),
- namedtype.DefaultedNamedType('critical', univ.Boolean('False')),
- namedtype.NamedType('extnValue', univ.OctetString())
- )
-
-class Extensions(univ.SequenceOf):
- componentType = Extension()
- sizeSpec = univ.SequenceOf.sizeSpec + constraint.ValueSizeConstraint(1, MAX)
-
-class SubjectPublicKeyInfo(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('algorithm', AlgorithmIdentifier()),
- namedtype.NamedType('subjectPublicKey', univ.BitString())
- )
-
-class UniqueIdentifier(univ.BitString): pass
-
-class Time(univ.Choice):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('utcTime', useful.UTCTime()),
- namedtype.NamedType('generalTime', useful.GeneralizedTime())
- )
-
-class Validity(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('notBefore', Time()),
- namedtype.NamedType('notAfter', Time())
- )
-
-class CertificateSerialNumber(univ.Integer): pass
-
-class Version(univ.Integer):
- namedValues = namedval.NamedValues(
- ('v1', 0), ('v2', 1), ('v3', 2)
- )
-
-class TBSCertificate(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.DefaultedNamedType('version', Version('v1', tagSet=Version.tagSet.tagExplicitly(tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 0)))),
- namedtype.NamedType('serialNumber', CertificateSerialNumber()),
- namedtype.NamedType('signature', AlgorithmIdentifier()),
- namedtype.NamedType('issuer', Name()),
- namedtype.NamedType('validity', Validity()),
- namedtype.NamedType('subject', Name()),
- namedtype.NamedType('subjectPublicKeyInfo', SubjectPublicKeyInfo()),
- namedtype.OptionalNamedType('issuerUniqueID', UniqueIdentifier().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 1))),
- namedtype.OptionalNamedType('subjectUniqueID', UniqueIdentifier().subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 2))),
- namedtype.OptionalNamedType('extensions', Extensions().subtype(explicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatSimple, 3)))
- )
-
-class Certificate(univ.Sequence):
- componentType = namedtype.NamedTypes(
- namedtype.NamedType('tbsCertificate', TBSCertificate()),
- namedtype.NamedType('signatureAlgorithm', AlgorithmIdentifier()),
- namedtype.NamedType('signatureValue', univ.BitString())
- )
-
- def get_version(self):
- info = self.getComponentByName('tbsCertificate')
- version = info.getComponentByName('version')
- return version._value
-
- def get_subject(self):
- info = self.getComponentByName('tbsCertificate')
- return info.getComponentByName('subject')
-
- def get_serial_number(self):
- 'return the serial number as a Python long object'
- info = self.getComponentByName('tbsCertificate')
- return long(info.getComponentByName('serialNumber'))
-
-# end of ASN.1 data structures
-
def strip_header(pem):
"""
Remove the header and footer from a certificate.
@@ -205,69 +37,53 @@ def strip_header(pem):
return pem
-
-def load_certificate(data, type=PEM):
+def load_certificate(data, datatype=PEM, dbdir=None):
"""
Given a base64-encoded certificate, with or without the
header/footer, return a request object.
+
+ Returns a nss.Certificate type
"""
- if (type == PEM):
+ if type(data) in (tuple, list):
+ data = data[0]
+
+ if (datatype == PEM):
data = strip_header(data)
data = base64.b64decode(data)
- return decoder.decode(data, asn1Spec=Certificate())[0]
+ if dbdir is None:
+ if api.env.in_tree:
+ dbdir = api.env.dot_ipa + os.sep + 'alias'
+ else:
+ dbdir = "/etc/httpd/alias"
+
+ nss.nss_init(dbdir)
+ return nss.Certificate(buffer(data))
-def get_subject_components(certificate, type=PEM):
+def get_subject(certificate, datatype=PEM):
"""
Load an X509.3 certificate and get the subject.
-
- Return a tuple of a certificate subject.
- (('CN', u'www.example.com'), ('O', u'IPA'))
"""
- x509cert = load_certificate(certificate, type)
- return x509cert.get_subject().get_components()
+ cert = load_certificate(certificate, datatype)
+ return cert.subject
-def get_serial_number(certificate, type=PEM):
+def get_serial_number(certificate, datatype=PEM):
"""
- Return the serial number of a certificate as a Python long object.
+ Return the decimal value of the serial number.
"""
- x509cert = load_certificate(certificate, type)
- return x509cert.get_serial_number()
+ cert = load_certificate(certificate, datatype)
+ return cert.serial_number
if __name__ == '__main__':
- certType = Certificate()
-
- # Read PEM certs from stdin and print them out in plain text
-
- stSpam, stHam, stDump = 0, 1, 2
- state = stSpam
- certCnt = 0
- for certLine in sys.stdin.readlines():
- certLine = string.strip(certLine)
- if state == stSpam:
- if state == stSpam:
- if certLine == '-----BEGIN CERTIFICATE-----':
- certLines = []
- state = stHam
- continue
- if state == stHam:
- if certLine == '-----END CERTIFICATE-----':
- state = stDump
- else:
- certLines.append(certLine)
- if state == stDump:
- substrate = ''
- for certLine in certLines:
- substrate = substrate + base64.b64decode(certLine)
+ nss.nss_init_nodb()
- cert = decoder.decode(substrate, asn1Spec=certType)[0]
- print cert.prettyPrint()
+ # Read PEM certs from stdin and print out its components
- assert encoder.encode(cert) == substrate, 'cert recode fails'
+ certlines = sys.stdin.readlines()
+ cert = ''.join(certlines)
- certCnt = certCnt + 1
- state = stSpam
+ cert = load_certificate(cert)
- print '*** %s PEM cert(s) de/serialized' % certCnt
+ print cert
diff --git a/ipapython/nsslib.py b/ipapython/nsslib.py
index 1710a7d74..02bff00a8 100644
--- a/ipapython/nsslib.py
+++ b/ipapython/nsslib.py
@@ -188,7 +188,6 @@ class NSPRConnection(httplib.HTTPConnection):
httplib.HTTPConnection.__init__(self, host, port, strict)
logging.debug('%s init %s', self.__class__.__name__, host)
- nss.nss_init_nodb()
self.sock = io.Socket()
def connect(self):