summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xinstall/certmonger/dogtag-ipa-ca-renew-agent-submit22
-rw-r--r--install/restart_scripts/renew_ca_cert6
-rwxr-xr-xinstall/tools/ipa-replica-conncheck5
-rw-r--r--ipaclient/install/client.py13
-rw-r--r--ipaclient/plugins/ca.py15
-rw-r--r--ipaclient/plugins/cert.py7
-rw-r--r--ipaclient/plugins/host.py7
-rw-r--r--ipaclient/plugins/service.py7
-rw-r--r--ipaclient/plugins/user.py7
-rw-r--r--ipaclient/plugins/vault.py6
-rw-r--r--ipalib/install/certstore.py50
-rw-r--r--ipalib/x509.py532
-rw-r--r--ipaplatform/redhat/tasks.py20
-rw-r--r--ipapython/certdb.py106
-rw-r--r--ipaserver/install/ca.py2
-rw-r--r--ipaserver/install/cainstance.py9
-rw-r--r--ipaserver/install/certs.py37
-rw-r--r--ipaserver/install/dogtaginstance.py7
-rw-r--r--ipaserver/install/dsinstance.py10
-rw-r--r--ipaserver/install/httpinstance.py6
-rw-r--r--ipaserver/install/installutils.py17
-rw-r--r--ipaserver/install/ipa_cacert_manage.py45
-rw-r--r--ipaserver/install/ipa_server_certinstall.py6
-rw-r--r--ipaserver/install/krainstance.py3
-rw-r--r--ipaserver/install/plugins/upload_cacrt.py9
-rw-r--r--ipaserver/install/server/upgrade.py2
-rw-r--r--ipaserver/install/service.py7
-rw-r--r--ipaserver/plugins/ca.py4
-rw-r--r--ipaserver/plugins/cert.py13
-rw-r--r--ipaserver/plugins/certmap.py7
-rw-r--r--ipatests/test_ipalib/test_x509.py4
-rw-r--r--ipatests/test_ipaserver/test_ldap.py20
-rw-r--r--ipatests/test_xmlrpc/testcert.py5
33 files changed, 538 insertions, 478 deletions
diff --git a/install/certmonger/dogtag-ipa-ca-renew-agent-submit b/install/certmonger/dogtag-ipa-ca-renew-agent-submit
index 3d3e79144..7d7ff9ae9 100755
--- a/install/certmonger/dogtag-ipa-ca-renew-agent-submit
+++ b/install/certmonger/dogtag-ipa-ca-renew-agent-submit
@@ -31,7 +31,6 @@ import syslog
import traceback
import tempfile
import shutil
-import base64
import contextlib
import json
@@ -266,8 +265,8 @@ def store_cert(**kwargs):
cert = os.environ.get('CERTMONGER_CERTIFICATE')
if not cert:
return (REJECTED, "New certificate requests not supported")
-
- dercert = x509.normalize_certificate(cert)
+ cert = x509.load_pem_x509_certificate(cert)
+ dercert = cert.public_bytes(x509.Encoding.DER)
dn = DN(('cn', nickname), ('cn', 'ca_renewal'),
('cn', 'ipa'), ('cn', 'etc'), api.env.basedn)
@@ -365,7 +364,8 @@ def retrieve_or_reuse_cert(**kwargs):
if not nickname:
return (REJECTED, "Nickname could not be determined")
- cert = os.environ.get('CERTMONGER_CERTIFICATE')
+ cert = x509.load_pem_x509_certificate(
+ fix_pem(os.environ.get('CERTMONGER_CERTIFICATE'))) # TODO: the fix_pem somehow got there early, so making this comment way too long to get rid of it later
if not cert:
return (REJECTED, "New certificate requests not supported")
@@ -378,11 +378,10 @@ def retrieve_or_reuse_cert(**kwargs):
except errors.NotFound:
pass
else:
- cert = entry.single_value['usercertificate']
- cert = base64.b64encode(cert)
- cert = x509.make_pem(cert)
+ cert = x509.load_der_x509_certificate(
+ entry.single_value['usercertificate'])
- return (ISSUED, cert)
+ return (ISSUED, cert.public_bytes(x509.Encoding.PEM))
def retrieve_cert_continuous(reuse_existing, **kwargs):
@@ -392,7 +391,7 @@ def retrieve_cert_continuous(reuse_existing, **kwargs):
"""
old_cert = os.environ.get('CERTMONGER_CERTIFICATE')
if old_cert:
- old_cert = x509.normalize_certificate(old_cert)
+ old_cert = x509.load_pem_x509_certificate(old_cert)
result = call_handler(retrieve_or_reuse_cert,
reuse_existing=reuse_existing,
@@ -400,7 +399,7 @@ def retrieve_cert_continuous(reuse_existing, **kwargs):
if result[0] != ISSUED or reuse_existing:
return result
- new_cert = x509.normalize_certificate(result[1])
+ new_cert = x509.load_pem_x509_certificate(result[1])
if new_cert == old_cert:
syslog.syslog(syslog.LOG_INFO, "Updated certificate not available")
# No cert available yet, tell certmonger to wait another 8 hours
@@ -431,7 +430,8 @@ def renew_ca_cert(reuse_existing, **kwargs):
cert = os.environ.get('CERTMONGER_CERTIFICATE')
if not cert:
return (REJECTED, "New certificate requests not supported")
- is_self_signed = x509.is_self_signed(cert)
+ cert = x509.load_pem_x509_certificate(cert)
+ is_self_signed = cert.is_self_signed()
operation = os.environ.get('CERTMONGER_OPERATION')
if operation == 'SUBMIT':
diff --git a/install/restart_scripts/renew_ca_cert b/install/restart_scripts/renew_ca_cert
index bb31defc0..30716531d 100644
--- a/install/restart_scripts/renew_ca_cert
+++ b/install/restart_scripts/renew_ca_cert
@@ -29,7 +29,7 @@ import traceback
from ipalib.install import certstore
from ipapython import ipautil
-from ipalib import api, errors, x509
+from ipalib import api, errors
from ipalib.install.kinit import kinit_keytab
from ipaserver.install import certs, cainstance, installutils
from ipaserver.plugins.ldap2 import ldap2
@@ -64,7 +64,7 @@ def _main():
# Fetch the new certificate
db = certs.CertDB(api.env.realm, nssdir=paths.PKI_TOMCAT_ALIAS_DIR)
- cert = db.get_cert_from_db(nickname, pem=False)
+ cert = db.get_cert_from_db(nickname)
if not cert:
syslog.syslog(syslog.LOG_ERR, 'No certificate %s found.' % nickname)
sys.exit(1)
@@ -106,7 +106,7 @@ def _main():
cfg_path, 'subsystem.select', '=')
if config == 'New':
syslog.syslog(syslog.LOG_NOTICE, "Updating CS.cfg")
- if x509.is_self_signed(cert, x509.DER):
+ if cert.is_self_signed():
installutils.set_directive(
cfg_path, 'hierarchy.select', 'Root',
quotes=False, separator='=')
diff --git a/install/tools/ipa-replica-conncheck b/install/tools/ipa-replica-conncheck
index 1a50c4b7e..cb08ad3b9 100755
--- a/install/tools/ipa-replica-conncheck
+++ b/install/tools/ipa-replica-conncheck
@@ -47,7 +47,6 @@ from socket import SOCK_STREAM, SOCK_DGRAM
import distutils.spawn
from ipaplatform.paths import paths
import gssapi
-from cryptography.hazmat.primitives import serialization
logger = logging.getLogger(os.path.basename(__file__))
@@ -544,10 +543,8 @@ def main():
ca_certs = x509.load_certificate_list_from_file(
options.ca_cert_file)
for ca_cert in ca_certs:
- data = ca_cert.public_bytes(
- serialization.Encoding.DER)
nss_db.add_cert(
- data,
+ ca_cert,
str(DN(ca_cert.subject)),
certdb.EXTERNAL_CA_TRUST_FLAGS)
diff --git a/ipaclient/install/client.py b/ipaclient/install/client.py
index 856634600..b7293bb29 100644
--- a/ipaclient/install/client.py
+++ b/ipaclient/install/client.py
@@ -27,7 +27,6 @@ import tempfile
import time
import traceback
-from cryptography.hazmat.primitives import serialization
# pylint: disable=import-error
from six.moves.configparser import RawConfigParser
from six.moves.urllib.parse import urlparse, urlunparse
@@ -1647,9 +1646,7 @@ def get_ca_certs_from_ldap(server, basedn, realm):
logger.debug("get_ca_certs_from_ldap() error: %s", e)
raise
- certs = [x509.load_der_x509_certificate(c[0]) for c in certs
- if c[2] is not False]
-
+ certs = [c[0] for c in certs if c[2] is not False]
return certs
@@ -1830,10 +1827,6 @@ def get_ca_certs(fstore, options, server, basedn, realm):
if ca_certs is not None:
try:
- ca_certs = [
- cert.public_bytes(serialization.Encoding.DER)
- for cert in ca_certs
- ]
x509.write_certificate_list(ca_certs, ca_file)
except Exception as e:
if os.path.exists(ca_file):
@@ -2680,10 +2673,6 @@ def _install(options):
# Add CA certs to a temporary NSS database
ca_certs = x509.load_certificate_list_from_file(paths.IPA_CA_CRT)
- ca_certs = [
- cert.public_bytes(serialization.Encoding.DER)
- for cert in ca_certs
- ]
try:
tmp_db.create_db()
diff --git a/ipaclient/plugins/ca.py b/ipaclient/plugins/ca.py
index fe9c55f4c..c48f76ea7 100644
--- a/ipaclient/plugins/ca.py
+++ b/ipaclient/plugins/ca.py
@@ -2,7 +2,6 @@
# Copyright (C) 2016 FreeIPA Contributors see COPYING for license
#
-import base64
from ipaclient.frontend import MethodOverride
from ipalib import errors, util, x509, Str
from ipalib.plugable import Registry
@@ -34,15 +33,15 @@ class WithCertOutArgs(MethodOverride):
result = super(WithCertOutArgs, self).forward(*keys, **options)
if filename:
- def to_pem(x):
- return x509.make_pem(x)
if options.get('chain', False):
- ders = result['result']['certificate_chain']
- data = '\n'.join(to_pem(base64.b64encode(der)) for der in ders)
+ certs = (x509.load_der_x509_certificate(c)
+ for c in result['result']['certificate_chain'])
else:
- data = to_pem(result['result']['certificate'])
- with open(filename, 'wb') as f:
- f.write(data)
+ certs = [
+ x509.load_der_x509_certificate(
+ result['result']['certificate'])
+ ]
+ x509.write_certificate_list(certs, filename)
return result
diff --git a/ipaclient/plugins/cert.py b/ipaclient/plugins/cert.py
index e318c5570..860f92553 100644
--- a/ipaclient/plugins/cert.py
+++ b/ipaclient/plugins/cert.py
@@ -66,10 +66,9 @@ class CertRetrieveOverride(MethodOverride):
certs = result['result']['certificate_chain']
else:
certs = [result['result']['certificate']]
- certs = (x509.ensure_der_format(cert) for cert in certs)
- certs = (x509.make_pem(base64.b64encode(cert)) for cert in certs)
- with open(certificate_out, 'w') as f:
- f.write('\n'.join(certs))
+ certs = (x509.load_der_x509_certificate(
+ x509.ensure_der_format(cert)) for cert in certs)
+ x509.write_certificate_list(certs, certificate_out)
return result
diff --git a/ipaclient/plugins/host.py b/ipaclient/plugins/host.py
index 7d8b92d8e..e3ae7fae6 100644
--- a/ipaclient/plugins/host.py
+++ b/ipaclient/plugins/host.py
@@ -34,10 +34,9 @@ class host_show(MethodOverride):
util.check_writable_file(options['out'])
result = super(host_show, self).forward(*keys, **options)
if 'usercertificate' in result['result']:
- x509.write_certificate_list(
- result['result']['usercertificate'],
- options['out']
- )
+ certs = (x509.load_der_x509_certificate(c)
+ for c in result['result']['usercertificate'])
+ x509.write_certificate_list(certs, options['out'])
result['summary'] = (
_('Certificate(s) stored in file \'%(file)s\'')
% dict(file=options['out'])
diff --git a/ipaclient/plugins/service.py b/ipaclient/plugins/service.py
index c45a2f2bc..a48a76dc6 100644
--- a/ipaclient/plugins/service.py
+++ b/ipaclient/plugins/service.py
@@ -36,10 +36,9 @@ class service_show(MethodOverride):
util.check_writable_file(options['out'])
result = super(service_show, self).forward(*keys, **options)
if 'usercertificate' in result['result']:
- x509.write_certificate_list(
- result['result']['usercertificate'],
- options['out']
- )
+ certs = (x509.load_der_x509_certificate(c)
+ for c in result['result']['usercertificate'])
+ x509.write_certificate_list(certs, options['out'])
result['summary'] = (
_('Certificate(s) stored in file \'%(file)s\'')
% dict(file=options['out'])
diff --git a/ipaclient/plugins/user.py b/ipaclient/plugins/user.py
index 19eecac54..5af73b50c 100644
--- a/ipaclient/plugins/user.py
+++ b/ipaclient/plugins/user.py
@@ -67,10 +67,9 @@ class user_show(MethodOverride):
util.check_writable_file(options['out'])
result = super(user_show, self).forward(*keys, **options)
if 'usercertificate' in result['result']:
- x509.write_certificate_list(
- result['result']['usercertificate'],
- options['out']
- )
+ certs = (x509.load_der_x509_certificate(c)
+ for c in result['result']['usercertificate'])
+ x509.write_certificate_list(certs, options['out'])
result['summary'] = (
_('Certificate(s) stored in file \'%(file)s\'')
% dict(file=options['out'])
diff --git a/ipaclient/plugins/vault.py b/ipaclient/plugins/vault.py
index 022c5b7f9..398e4015b 100644
--- a/ipaclient/plugins/vault.py
+++ b/ipaclient/plugins/vault.py
@@ -624,15 +624,15 @@ class vaultconfig_show(MethodOverride):
response = super(vaultconfig_show, self).forward(*args, **options)
# cache transport certificate
- transport_cert = x509.load_certificate(
- response['result']['transport_cert'], x509.DER)
+ transport_cert = x509.load_der_x509_certificate(
+ response['result']['transport_cert'])
_transport_cert_cache.store_cert(
self.api.env.domain, transport_cert
)
if file:
- with open(file, 'w') as f:
+ with open(file, 'wb') as f:
f.write(response['result']['transport_cert'])
return response
diff --git a/ipalib/install/certstore.py b/ipalib/install/certstore.py
index 0d0902fc1..89302f6d8 100644
--- a/ipalib/install/certstore.py
+++ b/ipalib/install/certstore.py
@@ -28,13 +28,13 @@ from ipapython.dn import DN
from ipapython.certdb import get_ca_nickname, TrustFlags
from ipalib import errors, x509
-def _parse_cert(dercert):
+
+def _parse_cert(cert):
try:
- cert = x509.load_der_x509_certificate(dercert)
subject = DN(cert.subject)
issuer = DN(cert.issuer)
serial_number = cert.serial_number
- public_key_info = x509.get_der_public_key_info(dercert, x509.DER)
+ public_key_info = cert.public_key_info_bytes
except (ValueError, PyAsn1Error) as e:
raise ValueError("failed to decode certificate: %s" % e)
@@ -45,15 +45,15 @@ def _parse_cert(dercert):
return subject, issuer_serial, public_key_info
-def init_ca_entry(entry, dercert, nickname, trusted, ext_key_usage):
+def init_ca_entry(entry, cert, nickname, trusted, ext_key_usage):
"""
Initialize certificate store entry for a CA certificate.
"""
- subject, issuer_serial, public_key = _parse_cert(dercert)
+ subject, issuer_serial, public_key = _parse_cert(cert)
if ext_key_usage is not None:
try:
- cert_eku = x509.get_ext_key_usage(dercert, x509.DER)
+ cert_eku = cert.extended_key_usage
except ValueError as e:
raise ValueError("failed to decode certificate: %s" % e)
if cert_eku is not None:
@@ -68,7 +68,7 @@ def init_ca_entry(entry, dercert, nickname, trusted, ext_key_usage):
entry['ipaCertSubject'] = [subject]
entry['ipaCertIssuerSerial'] = [issuer_serial]
entry['ipaPublicKey'] = [public_key]
- entry['cACertificate;binary'] = [dercert]
+ entry['cACertificate;binary'] = [cert.public_bytes(x509.Encoding.DER)]
if trusted is not None:
entry['ipaKeyTrust'] = ['trusted' if trusted else 'distrusted']
@@ -79,11 +79,12 @@ def init_ca_entry(entry, dercert, nickname, trusted, ext_key_usage):
entry['ipaKeyExtUsage'] = ext_key_usage
-def update_compat_ca(ldap, base_dn, dercert):
+def update_compat_ca(ldap, base_dn, cert):
"""
Update the CA certificate in cn=CAcert,cn=ipa,cn=etc,SUFFIX.
"""
dn = DN(('cn', 'CAcert'), ('cn', 'ipa'), ('cn', 'etc'), base_dn)
+ dercert = cert.public_bytes(x509.Encoding.DER)
try:
entry = ldap.get_entry(dn, attrs_list=['cACertificate;binary'])
entry.single_value['cACertificate;binary'] = dercert
@@ -152,12 +153,12 @@ def add_ca_cert(ldap, base_dn, dercert, nickname, trusted=None,
clean_old_config(ldap, base_dn, dn, config_ipa, config_compat)
-def update_ca_cert(ldap, base_dn, dercert, trusted=None, ext_key_usage=None,
+def update_ca_cert(ldap, base_dn, cert, trusted=None, ext_key_usage=None,
config_ipa=False, config_compat=False):
"""
Update existing entry for a CA certificate in the certificate store.
"""
- subject, issuer_serial, public_key = _parse_cert(dercert)
+ subject, issuer_serial, public_key = _parse_cert(cert)
filter = ldap.make_filter({'ipaCertSubject': subject})
result, _truncated = ldap.find_entries(
@@ -172,7 +173,7 @@ def update_ca_cert(ldap, base_dn, dercert, trusted=None, ext_key_usage=None,
for old_cert in entry['cACertificate;binary']:
# Check if we are adding a new cert
- if old_cert == dercert:
+ if old_cert == cert:
break
else:
# We are adding a new cert, validate it
@@ -181,7 +182,8 @@ def update_ca_cert(ldap, base_dn, dercert, trusted=None, ext_key_usage=None,
if entry.single_value['ipaPublicKey'] != public_key:
raise ValueError("subject public key info mismatch")
entry['ipaCertIssuerSerial'].append(issuer_serial)
- entry['cACertificate;binary'].append(dercert)
+ entry['cACertificate;binary'].append(
+ cert.public_bytes(x509.Encoding.DER))
# Update key trust
if trusted is not None:
@@ -217,22 +219,24 @@ def update_ca_cert(ldap, base_dn, dercert, trusted=None, ext_key_usage=None,
entry.setdefault('ipaConfigString', []).append('compatCA')
if is_compat or config_compat:
- update_compat_ca(ldap, base_dn, dercert)
+ update_compat_ca(ldap, base_dn, cert)
ldap.update_entry(entry)
clean_old_config(ldap, base_dn, dn, config_ipa, config_compat)
-def put_ca_cert(ldap, base_dn, dercert, nickname, trusted=None,
+def put_ca_cert(ldap, base_dn, cert, nickname, trusted=None,
ext_key_usage=None, config_ipa=False, config_compat=False):
"""
Add or update entry for a CA certificate in the certificate store.
+
+ :param cert: IPACertificate
"""
try:
- update_ca_cert(ldap, base_dn, dercert, trusted, ext_key_usage,
+ update_ca_cert(ldap, base_dn, cert, trusted, ext_key_usage,
config_ipa=config_ipa, config_compat=config_compat)
except errors.NotFound:
- add_ca_cert(ldap, base_dn, dercert, nickname, trusted, ext_key_usage,
+ add_ca_cert(ldap, base_dn, cert, nickname, trusted, ext_key_usage,
config_ipa=config_ipa, config_compat=config_compat)
except errors.EmptyModlist:
pass
@@ -306,11 +310,12 @@ def get_ca_certs(ldap, base_dn, compat_realm, compat_ipa_ca,
for cert in entry.get('cACertificate;binary', []):
try:
- _parse_cert(cert)
+ cert_obj = x509.load_der_x509_certificate(cert)
+ _parse_cert(cert_obj)
except ValueError:
certs = []
break
- certs.append((cert, nickname, trusted, ext_key_usage))
+ certs.append((cert_obj, nickname, trusted, ext_key_usage))
except errors.NotFound:
try:
ldap.get_entry(container_dn, [''])
@@ -319,7 +324,8 @@ def get_ca_certs(ldap, base_dn, compat_realm, compat_ipa_ca,
dn = DN(('cn', 'CAcert'), config_dn)
entry = ldap.get_entry(dn, ['cACertificate;binary'])
- cert = entry.single_value['cACertificate;binary']
+ cert = x509.load_der_x509_certificate(
+ entry.single_value['cACertificate;binary'])
try:
subject, _issuer_serial, _public_key_info = _parse_cert(cert)
except ValueError:
@@ -354,16 +360,18 @@ def key_policy_to_trust_flags(trusted, ca, ext_key_usage):
return TrustFlags(False, trusted, ca, ext_key_usage)
-def put_ca_cert_nss(ldap, base_dn, dercert, nickname, trust_flags,
+def put_ca_cert_nss(ldap, base_dn, cert, nickname, trust_flags,
config_ipa=False, config_compat=False):
"""
Add or update entry for a CA certificate in the certificate store.
+
+ :param cert: IPACertificate
"""
trusted, ca, ext_key_usage = trust_flags_to_key_policy(trust_flags)
if ca is False:
raise ValueError("must be CA certificate")
- put_ca_cert(ldap, base_dn, dercert, nickname, trusted, ext_key_usage,
+ put_ca_cert(ldap, base_dn, cert, nickname, trusted, ext_key_usage,
config_ipa, config_compat)
diff --git a/ipalib/x509.py b/ipalib/x509.py
index 5bec5ae59..dc7a2eb3c 100644
--- a/ipalib/x509.py
+++ b/ipalib/x509.py
@@ -39,10 +39,15 @@ import ssl
import base64
import re
+from cryptography import x509 as crypto_x509
+from cryptography import utils as crypto_utils
from cryptography.hazmat.backends import default_backend
-import cryptography.x509
+from cryptography.hazmat.primitives.serialization import (
+ Encoding, PublicFormat
+)
from pyasn1.type import univ, char, namedtype, tag
from pyasn1.codec.der import decoder, encoder
+# from pyasn1.codec.native import decoder, encoder
from pyasn1_modules import rfc2315, rfc2459
import six
@@ -101,26 +106,319 @@ def strip_header(pem):
return pem
+@crypto_utils.register_interface(crypto_x509.Certificate)
+class IPACertificate(object):
+ """
+ A proxy class wrapping a python-cryptography certificate representation for
+ FreeIPA purposes
+ """
+ def __init__(self, cert, backend=None):
+ """
+ :param cert: A python-cryptography Certificate object
+ :param backend: A python-cryptography Backend object
+ """
+ self._cert = cert
+ self.backend = default_backend() if backend is None else backend()
+
+ # initialize the certificate fields
+ # we have to do it this way so that some systems don't explode since
+ # some field types encode-decoding is not strongly defined
+ self._subject = self.__get_der_field('subject')
+ self._issuer = self.__get_der_field('issuer')
+
+ def __getstate__(self):
+ state = {
+ '_cert': self.public_bytes(Encoding.DER),
+ '_subject': self.subject_bytes,
+ '_issuer': self.issuer_bytes,
+ }
+ return state
+
+ def __setstate__(self, state):
+ self._subject = state['_subject']
+ self._issuer = state['_issuer']
+ self._cert = crypto_x509.load_der_x509_certificate(
+ state['_cert'], backend=default_backend())
+
+ def __eq__(self, other):
+ """
+ Checks equality.
+
+ :param other: either cryptography.Certificate or IPACertificate or
+ bytes representing a DER-formatted certificate
+ """
+ if (isinstance(other, (crypto_x509.Certificate, IPACertificate))):
+ return (self.public_bytes(Encoding.DER) ==
+ other.public_bytes(Encoding.DER))
+ elif isinstance(other, bytes):
+ return self.public_bytes(Encoding.DER) == other
+ else:
+ return False
+
+ def __ne__(self, other):
+ """
+ Checks not equal.
+ """
+ return not self.__eq__(other)
+
+ def __hash__(self):
+ """
+ Computes a hash of the wrapped cryptography.Certificate.
+ """
+ return hash(self._cert)
+
+ def __encode_extension(self, oid, critical, value):
+ # TODO: have another proxy for crypto_x509.Extension which would
+ # provide public_bytes on the top of what python-cryptography has
+ ext = rfc2459.Extension()
+ # TODO: this does not have to be so weird, pyasn1 now has codecs
+ # which are capable of providing python-native types
+ ext['extnID'] = univ.ObjectIdentifier(oid)
+ ext['critical'] = univ.Boolean(critical)
+ ext['extnValue'] = univ.Any(encoder.encode(univ.OctetString(value)))
+ ext = encoder.encode(ext)
+ return ext
+
+ def __get_pyasn1_field(self, field):
+ """
+ :returns: a field of the certificate in pyasn1 representation
+ """
+ cert_bytes = self.tbs_certificate_bytes
+ cert = decoder.decode(cert_bytes, rfc2459.TBSCertificate())[0]
+ field = cert[field]
+ return field
+
+ def __get_der_field(self, field):
+ """
+ :field: the name of the field of the certificate
+ :returns: bytes representing the value of a certificate field
+ """
+ return encoder.encode(self.__get_pyasn1_field(field))
+
+ def public_bytes(self, encoding):
+ """
+ Serializes the certificate to PEM or DER format.
+ """
+ return self._cert.public_bytes(encoding)
+
+ def is_self_signed(self):
+ """
+ :returns: True if this certificate is self-signed, False otherwise
+ """
+ return self._cert.issuer == self._cert.subject
+
+ def fingerprint(self, algorithm):
+ """
+ Counts fingerprint of the wrapped cryptography.Certificate
+ """
+ return self._cert.fingerprint(algorithm)
+
+ @property
+ def serial_number(self):
+ return self._cert.serial_number
+
+ @property
+ def version(self):
+ return self._cert.version
+
+ @property
+ def subject(self):
+ return self._cert.subject
+
+ @property
+ def subject_bytes(self):
+ return self._subject
+
+ @property
+ def signature_hash_algorithm(self):
+ """
+ Returns a HashAlgorithm corresponding to the type of the digest signed
+ in the certificate.
+ """
+ return self._cert.signature_hash_algorithm
+
+ @property
+ def signature_algorithm_oid(self):
+ """
+ Returns the ObjectIdentifier of the signature algorithm.
+ """
+ return self._cert.signature_algorithm_oid
+
+ @property
+ def signature(self):
+ """
+ Returns the signature bytes.
+ """
+ return self._cert.signature
+
+ @property
+ def issuer(self):
+ return self._cert.issuer
+
+ @property
+ def issuer_bytes(self):
+ return self._issuer
+
+ @property
+ def not_valid_before(self):
+ return self._cert.not_valid_before
+
+ @property
+ def not_valid_after(self):
+ return self._cert.not_valid_after
+
+ @property
+ def tbs_certificate_bytes(self):
+ return self._cert.tbs_certificate_bytes
+
+ @property
+ def extensions(self):
+ # TODO: own Extension and Extensions classes proxying
+ # python-cryptography
+ return self._cert.extensions
+
+ def public_key(self):
+ return self._cert.public_key()
+
+ @property
+ def public_key_info_bytes(self):
+ return self._cert.public_key().public_bytes(
+ encoding=Encoding.DER, format=PublicFormat.SubjectPublicKeyInfo)
+
+ @property
+ def extended_key_usage(self):
+ try:
+ ext_key_usage = self._cert.extensions.get_extension_for_oid(
+ crypto_x509.oid.ExtensionOID.EXTENDED_KEY_USAGE).value
+ except crypto_x509.ExtensionNotFound:
+ return None
+
+ return set(oid.dotted_string for oid in ext_key_usage)
+
+ @property
+ def extended_key_usage_bytes(self):
+ ekurfc = rfc2459.ExtKeyUsageSyntax()
+ eku = self.extended_key_usage or {EKU_PLACEHOLDER}
+ for i, oid in enumerate(eku):
+ ekurfc[i] = univ.ObjectIdentifier(oid)
+ ekurfc = encoder.encode(ekurfc)
+ return self.__encode_extension('2.5.29.37', EKU_ANY not in eku, ekurfc)
+
+ @property
+ def san_general_names(self):
+ """
+ Return SAN general names from a python-cryptography
+ certificate object. If the SAN extension is not present,
+ return an empty sequence.
+
+ Because python-cryptography does not yet provide a way to
+ handle unrecognised critical extensions (which may occur),
+ we must parse the certificate and extract the General Names.
+ For uniformity with other code, we manually construct values
+ of python-crytography GeneralName subtypes.
+
+ python-cryptography does not yet provide types for
+ ediPartyName or x400Address, so we drop these name types.
+
+ otherNames are NOT instantiated to more specific types where
+ the type is known. Use ``process_othernames`` to do that.
+
+ When python-cryptography can handle certs with unrecognised
+ critical extensions and implements ediPartyName and
+ x400Address, this function (and helpers) will be redundant
+ and should go away.
+
+ """
+ gns = self.__pyasn1_get_san_general_names()
+
+ GENERAL_NAME_CONSTRUCTORS = {
+ 'rfc822Name': lambda x: crypto_x509.RFC822Name(unicode(x)),
+ 'dNSName': lambda x: crypto_x509.DNSName(unicode(x)),
+ 'directoryName': _pyasn1_to_cryptography_directoryname,
+ 'registeredID': _pyasn1_to_cryptography_registeredid,
+ 'iPAddress': _pyasn1_to_cryptography_ipaddress,
+ 'uniformResourceIdentifier':
+ lambda x: crypto_x509.UniformResourceIdentifier(unicode(x)),
+ 'otherName': _pyasn1_to_cryptography_othername,
+ }
+
+ result = []
+
+ for gn in gns:
+ gn_type = gn.getName()
+ if gn_type in GENERAL_NAME_CONSTRUCTORS:
+ result.append(
+ GENERAL_NAME_CONSTRUCTORS[gn_type](gn.getComponent()))
+
+ return result
+
+ def __pyasn1_get_san_general_names(self):
+ # pyasn1 returns None when the key is not present in the certificate
+ # but we need an iterable
+ extensions = self.__get_pyasn1_field('extensions') or []
+ OID_SAN = univ.ObjectIdentifier('2.5.29.17')
+ gns = []
+ for ext in extensions:
+ if ext['extnID'] == OID_SAN:
+ der = decoder.decode(
+ ext['extnValue'], asn1Spec=univ.OctetString())[0]
+ gns = decoder.decode(der, asn1Spec=rfc2459.SubjectAltName())[0]
+ break
+ return gns
+
+ @property
+ def san_a_label_dns_names(self):
+ gns = self.__pyasn1_get_san_general_names()
+ result = []
+
+ for gn in gns:
+ if gn.getName() == 'dNSName':
+ result.append(unicode(gn.getComponent()))
+
+ return result
+
+ def match_hostname(self, hostname):
+ match_cert = {}
+
+ match_cert['subject'] = match_subject = []
+ for rdn in self._cert.subject.rdns:
+ match_rdn = []
+ for ava in rdn:
+ if ava.oid == crypto_x509.oid.NameOID.COMMON_NAME:
+ match_rdn.append(('commonName', ava.value))
+ match_subject.append(match_rdn)
+
+ values = self.san_a_label_dns_names
+ if values:
+ match_cert['subjectAltName'] = match_san = []
+ for value in values:
+ match_san.append(('DNS', value))
+
+ ssl.match_hostname(match_cert, DNSName(hostname).ToASCII())
+
+
def load_pem_x509_certificate(data):
"""
Load an X.509 certificate in PEM format.
- :returns: a python-cryptography ``Certificate`` object.
+ :returns: a ``IPACertificate`` object.
:raises: ``ValueError`` if unable to load the certificate.
"""
- return crypto_x509.load_pem_x509_certificate(data,
- backend=default_backend())
+ return IPACertificate(
+ crypto_x509.load_pem_x509_certificate(data, backend=default_backend())
+ )
def load_der_x509_certificate(data):
"""
Load an X.509 certificate in DER format.
- :returns: a python-cryptography ``Certificate`` object.
+ :returns: a ``IPACertificate`` object.
:raises: ``ValueError`` if unable to load the certificate.
"""
- return crypto_x509.load_der_x509_certificate(data,
- backend=default_backend())
+ return IPACertificate(
+ crypto_x509.load_der_x509_certificate(data, backend=default_backend())
+ )
def load_certificate_from_file(filename, dbdir=None):
@@ -138,7 +436,6 @@ def load_certificate_list(data):
Load a certificate list from a sequence of concatenated PEMs.
Return a list of python-cryptography ``Certificate`` objects.
-
"""
certs = PEM_REGEX.findall(data)
return [load_pem_x509_certificate(cert) for cert in certs]
@@ -155,11 +452,11 @@ def load_certificate_list_from_file(filename):
return load_certificate_list(f.read())
-def pkcs7_to_pems(data, datatype=PEM):
+def pkcs7_to_certs(data, datatype=PEM):
"""
Extract certificates from a PKCS #7 object.
- Return a ``list`` of X.509 PEM strings.
+ :returns: a ``list`` of ``IPACertificate`` objects.
"""
if datatype == PEM:
match = re.match(
@@ -187,62 +484,12 @@ def pkcs7_to_pems(data, datatype=PEM):
for certificate in signed_data['certificates']:
certificate = encoder.encode(certificate)
- certificate = base64.b64encode(certificate)
- certificate = make_pem(certificate)
+ certificate = load_der_x509_certificate(certificate)
result.append(certificate)
return result
-def is_self_signed(certificate, datatype=PEM):
- cert = load_certificate(certificate, datatype)
- return cert.issuer == cert.subject
-
-
-def _get_der_field(cert, datatype, dbdir, field):
- cert = normalize_certificate(cert)
- cert = decoder.decode(cert, rfc2459.Certificate())[0]
- field = cert['tbsCertificate'][field]
- field = encoder.encode(field)
- return field
-
-def get_der_subject(cert, datatype=PEM, dbdir=None):
- return _get_der_field(cert, datatype, dbdir, 'subject')
-
-def get_der_issuer(cert, datatype=PEM, dbdir=None):
- return _get_der_field(cert, datatype, dbdir, 'issuer')
-
-def get_der_serial_number(cert, datatype=PEM, dbdir=None):
- return _get_der_field(cert, datatype, dbdir, 'serialNumber')
-
-def get_der_public_key_info(cert, datatype=PEM, dbdir=None):
- return _get_der_field(cert, datatype, dbdir, 'subjectPublicKeyInfo')
-
-
-def get_ext_key_usage(certificate, datatype=PEM):
- cert = load_certificate(certificate, datatype)
- try:
- eku = cert.extensions.get_extension_for_oid(
- cryptography.x509.oid.ExtensionOID.EXTENDED_KEY_USAGE).value
- except cryptography.x509.ExtensionNotFound:
- return None
-
- return set(oid.dotted_string for oid in eku)
-
-
-def make_pem(data):
- """
- Convert a raw base64-encoded blob into something that looks like a PE
- file with lines split to 64 characters and proper headers.
- """
- if isinstance(data, bytes):
- data = data.decode('ascii')
- pemcert = '\r\n'.join([data[x:x+64] for x in range(0, len(data), 64)])
- return '-----BEGIN CERTIFICATE-----\n' + \
- pemcert + \
- '\n-----END CERTIFICATE-----'
-
-
def ensure_der_format(rawcert):
"""
Incoming certificates should be DER-encoded. If not it is converted to
@@ -254,8 +501,6 @@ def ensure_der_format(rawcert):
if not rawcert:
return None
- rawcert = strip_header(rawcert)
-
try:
if isinstance(rawcert, bytes):
# base64 must work with utf-8, otherwise it is raw bin certificate
@@ -299,58 +544,37 @@ def validate_der_x509_certificate(cert):
raise errors.CertificateFormatError(error=str(e))
-def write_certificate(rawcert, filename):
+def write_certificate(cert, filename):
"""
Write the certificate to a file in PEM format.
The cert value can be either DER or PEM-encoded, it will be normalized
to DER regardless, then back out to PEM.
"""
- dercert = normalize_certificate(rawcert)
try:
- fp = open(filename, 'w')
- fp.write(make_pem(base64.b64encode(dercert)))
- fp.close()
+ with open(filename, 'wb') as fp:
+ fp.write(cert.public_bytes(Encoding.PEM))
except (IOError, OSError) as e:
raise errors.FileError(reason=str(e))
-def write_certificate_list(rawcerts, filename):
+
+def write_certificate_list(certs, filename):
"""
Write a list of certificates to a file in PEM format.
- The cert values can be either DER or PEM-encoded, they will be normalized
- to DER regardless, then back out to PEM.
+ :param certs: a list of IPACertificate objects to be written to a file
+ :param filename: a path to the file the certificates should be written into
"""
- dercerts = [normalize_certificate(rawcert) for rawcert in rawcerts]
try:
- with open(filename, 'w') as f:
- for cert in dercerts:
- cert = base64.b64encode(cert)
- cert = make_pem(cert)
- f.write(cert + '\n')
+ with open(filename, 'wb') as f:
+ for cert in certs:
+ f.write(cert.public_bytes(Encoding.PEM))
except (IOError, OSError) as e:
raise errors.FileError(reason=str(e))
-def _encode_extension(oid, critical, value):
- ext = rfc2459.Extension()
- ext['extnID'] = univ.ObjectIdentifier(oid)
- ext['critical'] = univ.Boolean(critical)
- ext['extnValue'] = univ.Any(encoder.encode(univ.OctetString(value)))
- ext = encoder.encode(ext)
- return ext
-
-
-def encode_ext_key_usage(ext_key_usage):
- eku = rfc2459.ExtKeyUsageSyntax()
- for i, oid in enumerate(ext_key_usage):
- eku[i] = univ.ObjectIdentifier(oid)
- eku = encoder.encode(eku)
- return _encode_extension('2.5.29.37', EKU_ANY not in ext_key_usage, eku)
-
-
class _PrincipalName(univ.Sequence):
componentType = namedtype.NamedTypes(
namedtype.NamedType('name-type', univ.Integer().subtype(
@@ -385,13 +609,13 @@ def _decode_krb5principalname(data):
return name
-class KRB5PrincipalName(cryptography.x509.general_name.OtherName):
+class KRB5PrincipalName(crypto_x509.general_name.OtherName):
def __init__(self, type_id, value):
super(KRB5PrincipalName, self).__init__(type_id, value)
self.name = _decode_krb5principalname(value)
-class UPN(cryptography.x509.general_name.OtherName):
+class UPN(crypto_x509.general_name.OtherName):
def __init__(self, type_id, value):
super(UPN, self).__init__(type_id, value)
self.name = unicode(
@@ -411,128 +635,48 @@ def process_othernames(gns):
"""
for gn in gns:
- if isinstance(gn, cryptography.x509.general_name.OtherName):
+ if isinstance(gn, crypto_x509.general_name.OtherName):
cls = OTHERNAME_CLASS_MAP.get(
gn.type_id.dotted_string,
- cryptography.x509.general_name.OtherName)
+ crypto_x509.general_name.OtherName)
yield cls(gn.type_id, gn.value)
else:
yield gn
-def _pyasn1_get_san_general_names(cert):
- tbs = decoder.decode(
- cert.tbs_certificate_bytes,
- asn1Spec=rfc2459.TBSCertificate()
- )[0]
- OID_SAN = univ.ObjectIdentifier('2.5.29.17')
- # One would expect KeyError or empty iterable when the key ('extensions'
- # in this particular case) is not pressent in the certificate but pyasn1
- # returns None here
- extensions = tbs['extensions'] or []
- gns = []
- for ext in extensions:
- if ext['extnID'] == OID_SAN:
- der = decoder.decode(
- ext['extnValue'], asn1Spec=univ.OctetString())[0]
- gns = decoder.decode(der, asn1Spec=rfc2459.SubjectAltName())[0]
- break
-
- return gns
-
-
-def get_san_general_names(cert):
- """
- Return SAN general names from a python-cryptography
- certificate object. If the SAN extension is not present,
- return an empty sequence.
-
- Because python-cryptography does not yet provide a way to
- handle unrecognised critical extensions (which may occur),
- we must parse the certificate and extract the General Names.
- For uniformity with other code, we manually construct values
- of python-crytography GeneralName subtypes.
-
- python-cryptography does not yet provide types for
- ediPartyName or x400Address, so we drop these name types.
-
- otherNames are NOT instantiated to more specific types where
- the type is known. Use ``process_othernames`` to do that.
-
- When python-cryptography can handle certs with unrecognised
- critical extensions and implements ediPartyName and
- x400Address, this function (and helpers) will be redundant
- and should go away.
-
- """
- gns = _pyasn1_get_san_general_names(cert)
-
- GENERAL_NAME_CONSTRUCTORS = {
- 'rfc822Name': lambda x: cryptography.x509.RFC822Name(unicode(x)),
- 'dNSName': lambda x: cryptography.x509.DNSName(unicode(x)),
- 'directoryName': _pyasn1_to_cryptography_directoryname,
- 'registeredID': _pyasn1_to_cryptography_registeredid,
- 'iPAddress': _pyasn1_to_cryptography_ipaddress,
- 'uniformResourceIdentifier':
- lambda x: cryptography.x509.UniformResourceIdentifier(unicode(x)),
- 'otherName': _pyasn1_to_cryptography_othername,
- }
-
- result = []
-
- for gn in gns:
- gn_type = gn.getName()
- if gn_type in GENERAL_NAME_CONSTRUCTORS:
- result.append(
- GENERAL_NAME_CONSTRUCTORS[gn_type](gn.getComponent()))
-
- return result
-
-
def _pyasn1_to_cryptography_directoryname(dn):
attrs = []
# Name is CHOICE { RDNSequence } (only one possibility)
for rdn in dn.getComponent():
for ava in rdn:
- attr = cryptography.x509.NameAttribute(
+ attr = crypto_x509.NameAttribute(
_pyasn1_to_cryptography_oid(ava['type']),
unicode(decoder.decode(ava['value'])[0])
)
attrs.append(attr)
- return cryptography.x509.DirectoryName(cryptography.x509.Name(attrs))
+ return crypto_x509.DirectoryName(crypto_x509.Name(attrs))
def _pyasn1_to_cryptography_registeredid(oid):
- return cryptography.x509.RegisteredID(_pyasn1_to_cryptography_oid(oid))
+ return crypto_x509.RegisteredID(_pyasn1_to_cryptography_oid(oid))
def _pyasn1_to_cryptography_ipaddress(octet_string):
- return cryptography.x509.IPAddress(
+ return crypto_x509.IPAddress(
ipaddress.ip_address(bytes(octet_string)))
def _pyasn1_to_cryptography_othername(on):
- return cryptography.x509.OtherName(
+ return crypto_x509.OtherName(
_pyasn1_to_cryptography_oid(on['type-id']),
bytes(on['value'])
)
def _pyasn1_to_cryptography_oid(oid):
- return cryptography.x509.ObjectIdentifier(str(oid))
-
-
-def get_san_a_label_dns_names(cert):
- gns = _pyasn1_get_san_general_names(cert)
- result = []
-
- for gn in gns:
- if gn.getName() == 'dNSName':
- result.append(unicode(gn.getComponent()))
-
- return result
+ return crypto_x509.ObjectIdentifier(str(oid))
def chunk(size, s):
@@ -574,23 +718,3 @@ def format_datetime(t):
if t.tzinfo is None:
t = t.replace(tzinfo=UTC())
return unicode(t.strftime("%a %b %d %H:%M:%S %Y %Z"))
-
-
-def match_hostname(cert, hostname):
- match_cert = {}
-
- match_cert['subject'] = match_subject = []
- for rdn in cert.subject.rdns:
- match_rdn = []
- for ava in rdn:
- if ava.oid == cryptography.x509.oid.NameOID.COMMON_NAME:
- match_rdn.append(('commonName', ava.value))
- match_subject.append(match_rdn)
-
- values = get_san_a_label_dns_names(cert)
- if values:
- match_cert['subjectAltName'] = match_san = []
- for value in values:
- match_san.append(('DNS', value))
-
- ssl.match_hostname(match_cert, DNSName(hostname).ToASCII())
diff --git a/ipaplatform/redhat/tasks.py b/ipaplatform/redhat/tasks.py
index 3352502db..cc52c6c5f 100644
--- a/ipaplatform/redhat/tasks.py
+++ b/ipaplatform/redhat/tasks.py
@@ -30,7 +30,6 @@ import os
import pwd
import shutil
import socket
-import base64
import traceback
import errno
@@ -266,10 +265,10 @@ class RedHatTaskNamespace(BaseTaskNamespace):
has_eku = set()
for cert, nickname, trusted, ext_key_usage in ca_certs:
try:
- subject = x509.get_der_subject(cert, x509.DER)
- issuer = x509.get_der_issuer(cert, x509.DER)
- serial_number = x509.get_der_serial_number(cert, x509.DER)
- public_key_info = x509.get_der_public_key_info(cert, x509.DER)
+ subject = cert.subject_bytes
+ issuer = cert.issuer_bytes
+ serial_number = cert.serial_number
+ public_key_info = cert.public_key_info_bytes
except (PyAsn1Error, ValueError, CertificateError) as e:
logger.warning(
"Failed to decode certificate \"%s\": %s", nickname, e)
@@ -278,12 +277,9 @@ class RedHatTaskNamespace(BaseTaskNamespace):
label = urllib.parse.quote(nickname)
subject = urllib.parse.quote(subject)
issuer = urllib.parse.quote(issuer)
- serial_number = urllib.parse.quote(serial_number)
+ serial_number = urllib.parse.quote(str(serial_number))
public_key_info = urllib.parse.quote(public_key_info)
- cert = base64.b64encode(cert)
- cert = x509.make_pem(cert)
-
obj = ("[p11-kit-object-v1]\n"
"class: certificate\n"
"certificate-type: x-509\n"
@@ -302,14 +298,12 @@ class RedHatTaskNamespace(BaseTaskNamespace):
obj += "trusted: true\n"
elif trusted is False:
obj += "x-distrusted: true\n"
- obj += "%s\n\n" % cert
+ obj += "{pem}\n\n".format(pem=cert.public_bytes(x509.Encoding.PEM))
f.write(obj)
if ext_key_usage is not None and public_key_info not in has_eku:
- if not ext_key_usage:
- ext_key_usage = {x509.EKU_PLACEHOLDER}
try:
- ext_key_usage = x509.encode_ext_key_usage(ext_key_usage)
+ ext_key_usage = cert.extended_key_usage_bytes
except PyAsn1Error as e:
logger.warning(
"Failed to encode extended key usage for \"%s\": %s",
diff --git a/ipapython/certdb.py b/ipapython/certdb.py
index a2dfac08c..32f383d38 100644
--- a/ipapython/certdb.py
+++ b/ipapython/certdb.py
@@ -27,9 +27,7 @@ import re
import tempfile
from tempfile import NamedTemporaryFile
import shutil
-import base64
-from cryptography.hazmat.primitives import serialization
import cryptography.x509
from ipapython.dn import DN
@@ -91,7 +89,7 @@ def find_cert_from_txt(cert, start=0):
trailing text, pull out just the certificate part. This will return
the FIRST cert in a stream of data.
- Returns a tuple (certificate, last position in cert)
+ :returns: a tuple (IPACertificate, last position in cert)
"""
s = cert.find('-----BEGIN CERTIFICATE-----', start)
e = cert.find('-----END CERTIFICATE-----', s)
@@ -101,7 +99,7 @@ def find_cert_from_txt(cert, start=0):
if s < 0 or e < 0:
raise RuntimeError("Unable to find certificate")
- cert = cert[s:e]
+ cert = x509.load_pem_x509_certificate(cert[s:e].encode('utf-8'))
return (cert, e)
@@ -182,14 +180,10 @@ def unparse_trust_flags(trust_flags):
def verify_kdc_cert_validity(kdc_cert, ca_certs, realm):
- pem_kdc_cert = kdc_cert.public_bytes(serialization.Encoding.PEM)
- pem_ca_certs = '\n'.join(
- cert.public_bytes(serialization.Encoding.PEM) for cert in ca_certs)
-
with NamedTemporaryFile() as kdc_file, NamedTemporaryFile() as ca_file:
- kdc_file.write(pem_kdc_cert)
+ kdc_file.write(kdc_cert.public_bytes(x509.Encoding.PEM))
kdc_file.flush()
- ca_file.write(pem_ca_certs)
+ x509.write_certificate_list(ca_certs, ca_file.name)
ca_file.flush()
try:
@@ -209,7 +203,7 @@ def verify_kdc_cert_validity(kdc_cert, ca_certs, realm):
raise ValueError("invalid for a KDC")
principal = str(Principal(['krbtgt', realm], realm))
- gns = x509.process_othernames(x509.get_san_general_names(kdc_cert))
+ gns = x509.process_othernames(kdc_cert.san_general_names)
for gn in gns:
if isinstance(gn, x509.KRB5PrincipalName) and gn.name == principal:
break
@@ -458,7 +452,7 @@ class NSSDatabase(object):
if label in ('CERTIFICATE', 'X509 CERTIFICATE',
'X.509 CERTIFICATE'):
try:
- x509.load_pem_x509_certificate(match.group(2))
+ cert = x509.load_pem_x509_certificate(body)
except ValueError as e:
if label != 'CERTIFICATE':
logger.warning(
@@ -467,13 +461,13 @@ class NSSDatabase(object):
filename, line, e)
continue
else:
- extracted_certs.append(body)
+ extracted_certs.append(cert)
loaded = True
continue
if label in ('PKCS7', 'PKCS #7 SIGNED DATA', 'CERTIFICATE'):
try:
- certs = x509.pkcs7_to_pems(body)
+ certs = x509.pkcs7_to_certs(body)
except ipautil.CalledProcessError as e:
if label == 'CERTIFICATE':
logger.warning(
@@ -521,7 +515,7 @@ class NSSDatabase(object):
filename, line, e)
continue
else:
- extracted_key = result.output
+ extracted_key = result.raw_output
key_file = filename
loaded = True
continue
@@ -531,12 +525,11 @@ class NSSDatabase(object):
# Try to load the file as DER certificate
try:
- x509.load_der_x509_certificate(data)
+ cert = x509.load_der_x509_certificate(data)
except ValueError:
pass
else:
- data = x509.make_pem(base64.b64encode(data))
- extracted_certs.append(data)
+ extracted_certs.append(cert)
continue
# Try to import the file as PKCS#12 file
@@ -576,34 +569,34 @@ class NSSDatabase(object):
raise RuntimeError(
"No server certificates found in %s" % (', '.join(files)))
- for cert_pem in extracted_certs:
- cert = x509.load_pem_x509_certificate(cert_pem)
+ for cert in extracted_certs:
nickname = str(DN(cert.subject))
- data = cert.public_bytes(serialization.Encoding.DER)
- self.add_cert(data, nickname, EMPTY_TRUST_FLAGS)
+ self.add_cert(cert, nickname, EMPTY_TRUST_FLAGS)
if extracted_key:
- in_file = ipautil.write_tmp_file(
- '\n'.join(extracted_certs) + '\n' + extracted_key)
- out_file = tempfile.NamedTemporaryFile()
- out_password = ipautil.ipa_generate_password()
- out_pwdfile = ipautil.write_tmp_file(out_password)
- args = [
- OPENSSL, 'pkcs12',
- '-export',
- '-in', in_file.name,
- '-out', out_file.name,
- '-passin', 'file:' + self.pwd_file,
- '-passout', 'file:' + out_pwdfile.name,
- ]
- try:
- ipautil.run(args)
- except ipautil.CalledProcessError as e:
- raise RuntimeError(
- "No matching certificate found for private key from %s" %
- key_file)
+ with tempfile.NamedTemporaryFile(), tempfile.NamedTemporaryFile() \
+ as (in_file, out_file):
+ x509.write_certificate_list(extracted_certs, in_file.name)
+ in_file.write(extracted_key)
+ in_file.flush()
+ out_password = ipautil.ipa_generate_password()
+ out_pwdfile = ipautil.write_tmp_file(out_password)
+ args = [
+ OPENSSL, 'pkcs12',
+ '-export',
+ '-in', in_file.name,
+ '-out', out_file.name,
+ '-passin', 'file:' + self.pwd_file,
+ '-passout', 'file:' + out_pwdfile.name,
+ ]
+ try:
+ ipautil.run(args)
+ except ipautil.CalledProcessError as e:
+ raise RuntimeError(
+ "No matching certificate found for private key from "
+ "%s" % key_file)
- self.import_pkcs12(out_file.name, out_password)
+ self.import_pkcs12(out_file.name, out_password)
def trust_root_cert(self, root_nickname, trust_flags):
if root_nickname[:7] == "Builtin":
@@ -619,17 +612,13 @@ class NSSDatabase(object):
raise RuntimeError(
"Setting trust on %s failed" % root_nickname)
- def get_cert(self, nickname, pem=False):
+ def get_cert(self, nickname):
args = ['-L', '-n', nickname, '-a']
try:
result = self.run_certutil(args, capture_output=True)
except ipautil.CalledProcessError:
raise RuntimeError("Failed to get %s" % nickname)
- cert = result.output
- if not pem:
- cert, _start = find_cert_from_txt(cert, start=0)
- cert = x509.strip_header(cert)
- cert = base64.b64decode(cert)
+ cert, _start = find_cert_from_txt(result.output, start=0)
return cert
def has_nickname(self, nickname):
@@ -643,9 +632,9 @@ class NSSDatabase(object):
def export_pem_cert(self, nickname, location):
"""Export the given cert to PEM file in the given location"""
- cert = self.get_cert(nickname, pem=True)
- with open(location, "w+") as fd:
- fd.write(cert)
+ cert = self.get_cert(nickname)
+ with open(location, "wb") as fd:
+ fd.write(cert.public_bytes(x509.Encoding.PEM))
os.chmod(location, 0o444)
def import_pem_cert(self, nickname, flags, location):
@@ -662,7 +651,7 @@ class NSSDatabase(object):
)
cert, st = find_cert_from_txt(certs)
- self.add_cert(cert, nickname, flags, pem=True)
+ self.add_cert(cert, nickname, flags)
try:
find_cert_from_txt(certs, st)
@@ -672,12 +661,10 @@ class NSSDatabase(object):
raise ValueError('%s contains more than one certificate' %
location)
- def add_cert(self, cert, nick, flags, pem=False):
+ def add_cert(self, cert, nick, flags):
flags = unparse_trust_flags(flags)
- args = ["-A", "-n", nick, "-t", flags]
- if pem:
- args.append("-a")
- self.run_certutil(args, stdin=cert)
+ args = ["-A", "-n", nick, "-t", flags, '-a']
+ self.run_certutil(args, stdin=cert.public_bytes(x509.Encoding.PEM))
def delete_cert(self, nick):
self.run_certutil(["-D", "-n", nick])
@@ -688,7 +675,6 @@ class NSSDatabase(object):
Raises a ValueError if the certificate is invalid.
"""
cert = self.get_cert(nickname)
- cert = x509.load_der_x509_certificate(cert)
try:
self.run_certutil(['-V', '-n', nickname, '-u', 'V'],
@@ -699,13 +685,12 @@ class NSSDatabase(object):
raise ValueError(e.output)
try:
- x509.match_hostname(cert, hostname)
+ cert.match_hostname(hostname)
except ValueError:
raise ValueError('invalid for server %s' % hostname)
def verify_ca_cert_validity(self, nickname):
cert = self.get_cert(nickname)
- cert = x509.load_der_x509_certificate(cert)
if not cert.subject:
raise ValueError("has empty subject")
@@ -736,6 +721,5 @@ class NSSDatabase(object):
def verify_kdc_cert_validity(self, nickname, realm):
nicknames = self.get_trust_chain(nickname)
certs = [self.get_cert(nickname) for nickname in nicknames]
- certs = [x509.load_der_x509_certificate(cert) for cert in certs]
verify_kdc_cert_validity(certs[-1], certs[:-1], realm)
diff --git a/ipaserver/install/ca.py b/ipaserver/install/ca.py
index 5b229c0b0..0616e755b 100644
--- a/ipaserver/install/ca.py
+++ b/ipaserver/install/ca.py
@@ -320,7 +320,7 @@ def install_step_1(standalone, replica_config, options):
subject_base=subject_base)
dsdb = certs.CertDB(
realm_name, nssdir=dirname, subject_base=subject_base)
- cacert = cadb.get_cert_from_db('caSigningCert cert-pki-ca', pem=False)
+ cacert = cadb.get_cert_from_db('caSigningCert cert-pki-ca')
nickname = certdb.get_ca_nickname(realm_name)
trust_flags = certdb.IPA_CA_TRUST_FLAGS
dsdb.add_cert(cacert, nickname, trust_flags)
diff --git a/ipaserver/install/cainstance.py b/ipaserver/install/cainstance.py
index 23968861d..b11f560a7 100644
--- a/ipaserver/install/cainstance.py
+++ b/ipaserver/install/cainstance.py
@@ -791,24 +791,19 @@ class CAInstance(DogtagInstance):
data = base64.b64decode(chain)
# Get list of PEM certificates
- certlist = x509.pkcs7_to_pems(data, x509.DER)
+ certlist = x509.pkcs7_to_certs(data, x509.DER)
# We need to append the certs to the existing file, so start by
# reading the file
if ipautil.file_exists(paths.IPA_CA_CRT):
ca_certs = x509.load_certificate_list_from_file(paths.IPA_CA_CRT)
- ca_certs = [cert.public_bytes(serialization.Encoding.PEM)
- for cert in ca_certs]
certlist.extend(ca_certs)
# We have all the certificates in certlist, write them to a PEM file
for path in [paths.IPA_CA_CRT,
paths.KDC_CA_BUNDLE_PEM,
paths.CA_BUNDLE_PEM]:
- with open(path, 'w') as ipaca_pem:
- for cert in certlist:
- ipaca_pem.write(cert)
- ipaca_pem.write('\n')
+ x509.write_certificate_list(certlist, path)
def __request_ra_certificate(self):
# create a temp file storing the pwd
diff --git a/ipaserver/install/certs.py b/ipaserver/install/certs.py
index b2c7a77f7..210e7d208 100644
--- a/ipaserver/install/certs.py
+++ b/ipaserver/install/certs.py
@@ -58,9 +58,7 @@ def get_cert_nickname(cert):
representation of the first RDN in the subject and subject_dn
is a DN object.
"""
- cert_obj = x509.load_pem_x509_certificate(cert)
- dn = DN(cert_obj.subject)
-
+ dn = DN(cert.subject)
return (str(dn[0]), dn)
@@ -323,30 +321,20 @@ class CertDB(object):
nick = get_ca_nickname(self.realm)
else:
nick = str(subject_dn)
- self.nssdb.add_cert(cert, nick, trust_flags, pem=True)
+ self.nssdb.add_cert(cert, nick, trust_flags)
except RuntimeError:
break
- def get_cert_from_db(self, nickname, pem=True):
+ def get_cert_from_db(self, nickname):
"""
Retrieve a certificate from the current NSS database for nickname.
-
- pem controls whether the value returned PEM or DER-encoded. The
- default is the data straight from certutil -a.
"""
try:
args = ["-L", "-n", nickname, "-a"]
result = self.run_certutil(args, capture_output=True)
- cert = result.output
- if pem:
- return cert
- else:
- cert, _start = find_cert_from_txt(cert, start=0)
- cert = x509.strip_header(cert)
- dercert = base64.b64decode(cert)
- return dercert
+ return x509.load_pem_x509_certificate(result.raw_output)
except ipautil.CalledProcessError:
- return ''
+ return None
def track_server_cert(self, nickname, principal, password_file=None, command=None):
"""
@@ -362,8 +350,7 @@ class CertDB(object):
return
cert = self.get_cert_from_db(nickname)
- cert_obj = x509.load_pem_x509_certificate(cert)
- subject = str(DN(cert_obj.subject))
+ subject = str(DN(cert.subject))
certmonger.add_principal(request_id, principal)
certmonger.add_subject(request_id, subject)
@@ -392,8 +379,10 @@ class CertDB(object):
try:
self.issue_server_cert(self.certreq_fname, self.certder_fname)
self.import_cert(self.certder_fname, nickname)
+
with open(self.certder_fname, "r") as f:
dercert = f.read()
+ return x509.load_der_x509_certificate(dercert)
finally:
for fname in (self.certreq_fname, self.certder_fname):
try:
@@ -401,8 +390,6 @@ class CertDB(object):
except OSError:
pass
- return dercert
-
def request_cert(
self, subject, certtype="rsa", keysize="2048",
san_dnsnames=None):
@@ -519,8 +506,8 @@ class CertDB(object):
with open(cert_fname, "w") as f:
f.write(cert)
- def add_cert(self, cert, nick, flags, pem=False):
- self.nssdb.add_cert(cert, nick, flags, pem)
+ def add_cert(self, cert, nick, flags):
+ self.nssdb.add_cert(cert, nick, flags)
def import_cert(self, cert_fname, nickname):
"""
@@ -594,8 +581,6 @@ class CertDB(object):
newca, _st = find_cert_from_txt(newca)
cacert = self.get_cert_from_db(self.cacert_name)
- if cacert != '':
- cacert, _st = find_cert_from_txt(cacert)
if newca == cacert:
return
@@ -649,7 +634,7 @@ class CertDB(object):
cert, st = find_cert_from_txt(certs, st)
except RuntimeError:
break
- self.add_cert(cert, 'CA %s' % num, EMPTY_TRUST_FLAGS, pem=True)
+ self.add_cert(cert, 'CA %s' % num, EMPTY_TRUST_FLAGS)
num += 1
# We only handle one server cert
diff --git a/ipaserver/install/dogtaginstance.py b/ipaserver/install/dogtaginstance.py
index a582275fe..e63831968 100644
--- a/ipaserver/install/dogtaginstance.py
+++ b/ipaserver/install/dogtaginstance.py
@@ -29,7 +29,7 @@ import dbus
from pki.client import PKIConnection
import pki.system
-from ipalib import api, errors
+from ipalib import api, errors, x509
from ipalib.install import certmonger
from ipaplatform import services
from ipaplatform.constants import constants
@@ -342,7 +342,7 @@ class DogtagInstance(service.Service):
needs to get the new certificate as well.
``directive`` is the directive to update in CS.cfg
- cert is a DER-encoded certificate.
+ cert is IPACertificate.
cs_cfg is the path to the CS.cfg file
"""
@@ -350,7 +350,8 @@ class DogtagInstance(service.Service):
installutils.set_directive(
cs_cfg,
directive,
- base64.b64encode(cert),
+ # the cert must be only the base64 string without headers
+ base64.b64encode(cert.public_bytes(x509.Encoding.DER)),
quotes=False,
separator='=')
diff --git a/ipaserver/install/dsinstance.py b/ipaserver/install/dsinstance.py
index c0ad242d0..d823635ca 100644
--- a/ipaserver/install/dsinstance.py
+++ b/ipaserver/install/dsinstance.py
@@ -234,7 +234,7 @@ class DsInstance(service.Service):
self.pkcs12_info = None
self.cacert_name = None
self.ca_is_configured = True
- self.dercert = None
+ self.cert = None
self.idstart = None
self.idmax = None
self.ca_subject = None
@@ -791,7 +791,7 @@ class DsInstance(service.Service):
# We only handle one server cert
self.nickname = server_certs[0][0]
- self.dercert = dsdb.get_cert_from_db(self.nickname, pem=False)
+ self.cert = dsdb.get_cert_from_db(self.nickname)
if self.ca_is_configured:
dsdb.track_server_cert(
@@ -834,7 +834,7 @@ class DsInstance(service.Service):
api.Backend.ldap2.disconnect()
api.Backend.ldap2.connect()
- self.dercert = dsdb.get_cert_from_db(self.nickname, pem=False)
+ self.cert = dsdb.get_cert_from_db(self.nickname)
if prev_helper is not None:
self.add_cert_to_service()
@@ -888,12 +888,12 @@ class DsInstance(service.Service):
nicknames = dsdb.find_root_cert(self.cacert_name)[:-1]
for nickname in nicknames:
- cert = dsdb.get_cert_from_db(nickname, pem=False)
+ cert = dsdb.get_cert_from_db(nickname)
certstore.put_ca_cert_nss(conn, self.suffix, cert, nickname,
trust_flags[nickname])
nickname = self.cacert_name
- cert = dsdb.get_cert_from_db(nickname, pem=False)
+ cert = dsdb.get_cert_from_db(nickname)
cacert_flags = trust_flags[nickname]
if self.setup_pkinit:
cacert_flags = TrustFlags(
diff --git a/ipaserver/install/httpinstance.py b/ipaserver/install/httpinstance.py
index 6c56316a9..0b67d6093 100644
--- a/ipaserver/install/httpinstance.py
+++ b/ipaserver/install/httpinstance.py
@@ -138,7 +138,7 @@ class HTTPInstance(service.Service):
self.dm_password = dm_password
self.suffix = ipautil.realm_to_suffix(self.realm)
self.pkcs12_info = pkcs12_info
- self.dercert = None
+ self.cert = None
self.subject_base = subject_base
self.sub_dict = dict(
REALM=realm,
@@ -406,7 +406,7 @@ class HTTPInstance(service.Service):
nickname = server_certs[0][0]
if nickname == 'ipaCert':
nickname = server_certs[1][0]
- self.dercert = db.get_cert_from_db(nickname, pem=False)
+ self.cert = db.get_cert_from_db(nickname)
if self.ca_is_configured:
db.track_server_cert(nickname, self.principal, db.passwd_fname, 'restart_httpd')
@@ -443,7 +443,7 @@ class HTTPInstance(service.Service):
if prev_helper is not None:
certmonger.modify_ca_helper('IPA', prev_helper)
- self.dercert = db.get_cert_from_db(self.cert_nickname, pem=False)
+ self.cert = db.get_cert_from_db(self.cert_nickname)
if prev_helper is not None:
self.add_cert_to_service()
diff --git a/ipaserver/install/installutils.py b/ipaserver/install/installutils.py
index 4d06f50e4..ff37d8475 100644
--- a/ipaserver/install/installutils.py
+++ b/ipaserver/install/installutils.py
@@ -1054,9 +1054,8 @@ def load_pkcs12(cert_files, key_password, key_nickname, ca_cert_files,
if ca_cert is None:
ca_cert = cert
- cert_obj = x509.load_der_x509_certificate(cert)
- subject = DN(cert_obj.subject)
- issuer = DN(cert_obj.issuer)
+ subject = DN(cert.subject)
+ issuer = DN(cert.issuer)
if subject == issuer:
break
@@ -1183,11 +1182,9 @@ def load_external_cert(files, ca_subject):
ca_nickname = None
cache = {}
for nickname, _trust_flags in nssdb.list_certs():
- cert = nssdb.get_cert(nickname, pem=True)
-
- cert_obj = x509.load_pem_x509_certificate(cert)
- subject = DN(cert_obj.subject)
- issuer = DN(cert_obj.issuer)
+ cert = nssdb.get_cert(nickname)
+ subject = DN(cert.subject)
+ issuer = DN(cert.issuer)
cache[nickname] = (cert, subject, issuer)
if subject == ca_subject:
@@ -1220,11 +1217,11 @@ def load_external_cert(files, ca_subject):
(subject, ", ".join(files), e))
cert_file = tempfile.NamedTemporaryFile()
- cert_file.write(ca_cert_chain[0] + '\n')
+ cert_file.write(ca_cert_chain[0].public_bytes(x509.Encoding.PEM) + b'\n')
cert_file.flush()
ca_file = tempfile.NamedTemporaryFile()
- ca_file.write('\n'.join(ca_cert_chain[1:]) + '\n')
+ x509.write_certificate_list(ca_cert_chain[1:], ca_file.name)
ca_file.flush()
return cert_file, ca_file
diff --git a/ipaserver/install/ipa_cacert_manage.py b/ipaserver/install/ipa_cacert_manage.py
index 626968d84..24ef86fe1 100644
--- a/ipaserver/install/ipa_cacert_manage.py
+++ b/ipaserver/install/ipa_cacert_manage.py
@@ -22,7 +22,6 @@ from __future__ import print_function
import logging
import os
from optparse import OptionGroup # pylint: disable=deprecated-module
-from cryptography.hazmat.primitives import serialization
import gssapi
from ipalib.install import certmonger, certstore
@@ -161,7 +160,7 @@ class CACertManage(admintool.AdminTool):
"Found certmonger request id %r", self.request_id)
db = certs.CertDB(api.env.realm, nssdir=paths.PKI_TOMCAT_ALIAS_DIR)
- cert = db.get_cert_from_db(self.cert_nickname, pem=False)
+ cert = db.get_cert_from_db(self.cert_nickname)
options = self.options
if options.external_cert_files:
@@ -170,7 +169,7 @@ class CACertManage(admintool.AdminTool):
if options.self_signed is not None:
self_signed = options.self_signed
else:
- self_signed = x509.is_self_signed(cert, x509.DER)
+ self_signed = cert.is_self_signed()
if self_signed:
return self.renew_self_signed(ca)
@@ -205,38 +204,28 @@ class CACertManage(admintool.AdminTool):
"--external-cert-file=/path/to/signed_certificate "
"--external-cert-file=/path/to/external_ca_certificate")
- def renew_external_step_2(self, ca, old_cert_der):
+ def renew_external_step_2(self, ca, old_cert):
print("Importing the renewed CA certificate, please wait")
options = self.options
conn = api.Backend.ldap2
- old_cert_obj = x509.load_certificate(old_cert_der, x509.DER)
- old_der_subject = x509.get_der_subject(old_cert_der, x509.DER)
- old_spki = old_cert_obj.public_key().public_bytes(
- serialization.Encoding.DER,
- serialization.PublicFormat.SubjectPublicKeyInfo
- )
+ old_spki = old_cert.public_key_info_bytes
cert_file, ca_file = installutils.load_external_cert(
- options.external_cert_files, DN(old_cert_obj.subject))
+ options.external_cert_files, DN(old_cert.subject))
with open(cert_file.name) as f:
new_cert_data = f.read()
- new_cert_der = x509.normalize_certificate(new_cert_data)
- new_cert_obj = x509.load_certificate(new_cert_der, x509.DER)
- new_der_subject = x509.get_der_subject(new_cert_der, x509.DER)
- new_spki = new_cert_obj.public_key().public_bytes(
- serialization.Encoding.DER,
- serialization.PublicFormat.SubjectPublicKeyInfo
- )
-
- if new_cert_obj.subject != old_cert_obj.subject:
+ new_cert = x509.load_pem_x509_certificate(new_cert_data)
+ new_spki = new_cert.public_key_info_bytes
+
+ if new_cert.subject != old_cert.subject:
raise admintool.ScriptError(
"Subject name mismatch (visit "
"http://www.freeipa.org/page/Troubleshooting for "
"troubleshooting guide)")
- if new_der_subject != old_der_subject:
+ if new_cert.subject_bytes != old_cert.subject_bytes:
raise admintool.ScriptError(
"Subject name encoding mismatch (visit "
"http://www.freeipa.org/page/Troubleshooting for "
@@ -249,19 +238,18 @@ class CACertManage(admintool.AdminTool):
with certs.NSSDatabase() as tmpdb:
tmpdb.create_db()
- tmpdb.add_cert(old_cert_der, 'IPA CA', EXTERNAL_CA_TRUST_FLAGS)
+ tmpdb.add_cert(old_cert, 'IPA CA', EXTERNAL_CA_TRUST_FLAGS)
try:
- tmpdb.add_cert(new_cert_der, 'IPA CA', EXTERNAL_CA_TRUST_FLAGS)
+ tmpdb.add_cert(new_cert, 'IPA CA', EXTERNAL_CA_TRUST_FLAGS)
except ipautil.CalledProcessError as e:
raise admintool.ScriptError(
"Not compatible with the current CA certificate: %s" % e)
ca_certs = x509.load_certificate_list_from_file(ca_file.name)
for ca_cert in ca_certs:
- data = ca_cert.public_bytes(serialization.Encoding.DER)
tmpdb.add_cert(
- data, str(DN(ca_cert.subject)), EXTERNAL_CA_TRUST_FLAGS)
+ ca_cert, str(DN(ca_cert.subject)), EXTERNAL_CA_TRUST_FLAGS)
try:
tmpdb.verify_ca_cert_validity('IPA CA')
@@ -286,6 +274,8 @@ class CACertManage(admintool.AdminTool):
dn = DN(('cn', self.cert_nickname), ('cn', 'ca_renewal'),
('cn', 'ipa'), ('cn', 'etc'), api.env.basedn)
+
+ new_cert_der = new_cert.public_bytes(x509.Encoding.DER)
try:
entry = conn.get_entry(dn, ['usercertificate'])
entry['usercertificate'] = [new_cert_der]
@@ -338,15 +328,14 @@ class CACertManage(admintool.AdminTool):
cert_filename = self.args[1]
try:
- cert_obj = x509.load_certificate_from_file(cert_filename)
+ cert = x509.load_certificate_from_file(cert_filename)
except IOError as e:
raise admintool.ScriptError(
"Can't open \"%s\": %s" % (cert_filename, e))
except (TypeError, ValueError) as e:
raise admintool.ScriptError("Not a valid certificate: %s" % e)
- cert = cert_obj.public_bytes(serialization.Encoding.DER)
- nickname = options.nickname or str(DN(cert_obj.subject))
+ nickname = options.nickname or str(DN(cert.subject))
ca_certs = certstore.get_ca_certs_nss(api.Backend.ldap2,
api.env.basedn,
diff --git a/ipaserver/install/ipa_server_certinstall.py b/ipaserver/install/ipa_server_certinstall.py
index 9c8f6e81a..4ea1217ca 100644
--- a/ipaserver/install/ipa_server_certinstall.py
+++ b/ipaserver/install/ipa_server_certinstall.py
@@ -212,8 +212,7 @@ class ServerCertInstall(admintool.AdminTool):
# Start tracking only if the cert was issued by IPA CA
# Retrieve IPA CA
ipa_ca_cert = cdb.get_cert_from_db(
- get_ca_nickname(api.env.realm),
- pem=False)
+ get_ca_nickname(api.env.realm))
# And compare with the CA which signed this certificate
if ca_cert == ipa_ca_cert:
certmonger.start_tracking(
@@ -289,8 +288,7 @@ class ServerCertInstall(admintool.AdminTool):
# Start tracking only if the cert was issued by IPA CA
# Retrieve IPA CA
ipa_ca_cert = cdb.get_cert_from_db(
- get_ca_nickname(api.env.realm),
- pem=False)
+ get_ca_nickname(api.env.realm))
# And compare with the CA which signed this certificate
if ca_cert == ipa_ca_cert:
cdb.track_server_cert(server_cert,
diff --git a/ipaserver/install/krainstance.py b/ipaserver/install/krainstance.py
index 29aaad539..5a780b25f 100644
--- a/ipaserver/install/krainstance.py
+++ b/ipaserver/install/krainstance.py
@@ -27,7 +27,6 @@ import six
# pylint: disable=import-error
from six.moves.configparser import RawConfigParser
# pylint: enable=import-error
-from cryptography.hazmat.primitives import serialization
from ipalib import api
from ipalib import x509
@@ -304,7 +303,7 @@ class KRAInstance(DogtagInstance):
# get RA agent certificate
cert = x509.load_certificate_from_file(paths.RA_AGENT_PEM)
- cert_data = cert.public_bytes(serialization.Encoding.DER)
+ cert_data = cert.public_bytes(x509.Encoding.DER)
# connect to KRA database
conn = ldap2.ldap2(api)
diff --git a/ipaserver/install/plugins/upload_cacrt.py b/ipaserver/install/plugins/upload_cacrt.py
index 7b7e65ed6..25faa6e77 100644
--- a/ipaserver/install/plugins/upload_cacrt.py
+++ b/ipaserver/install/plugins/upload_cacrt.py
@@ -22,7 +22,7 @@ import logging
from ipalib.install import certstore
from ipaplatform.paths import paths
from ipaserver.install import certs
-from ipalib import Registry, errors
+from ipalib import Registry, errors, x509
from ipalib import Updater
from ipapython import certdb
from ipapython.dn import DN
@@ -60,7 +60,7 @@ class update_upload_cacrt(Updater):
continue
if nickname == ca_nickname and ca_enabled:
trust_flags = certdb.IPA_CA_TRUST_FLAGS
- cert = db.get_cert_from_db(nickname, pem=False)
+ cert = db.get_cert_from_db(nickname)
trust, _ca, eku = certstore.trust_flags_to_key_policy(trust_flags)
dn = DN(('cn', nickname), ('cn', 'certificates'), ('cn', 'ipa'),
@@ -90,6 +90,7 @@ class update_upload_cacrt(Updater):
pass
if ca_cert:
+ dercert = ca_cert.public_bytes(x509.Encoding.DER)
dn = DN(('cn', 'CACert'), ('cn', 'ipa'), ('cn','etc'),
self.api.env.basedn)
try:
@@ -98,11 +99,11 @@ class update_upload_cacrt(Updater):
entry = ldap.make_entry(dn)
entry['objectclass'] = ['nsContainer', 'pkiCA']
entry.single_value['cn'] = 'CAcert'
- entry.single_value['cACertificate;binary'] = ca_cert
+ entry.single_value['cACertificate;binary'] = dercert
ldap.add_entry(entry)
else:
if b'' in entry['cACertificate;binary']:
- entry.single_value['cACertificate;binary'] = ca_cert
+ entry.single_value['cACertificate;binary'] = dercert
ldap.update_entry(entry)
return False, []
diff --git a/ipaserver/install/server/upgrade.py b/ipaserver/install/server/upgrade.py
index 4eb2c9ed5..5e1d74b79 100644
--- a/ipaserver/install/server/upgrade.py
+++ b/ipaserver/install/server/upgrade.py
@@ -1602,7 +1602,7 @@ def disable_httpd_system_trust(http):
db = certs.CertDB(api.env.realm, nssdir=paths.HTTPD_ALIAS_DIR)
for nickname, trust_flags in db.list_certs():
if not trust_flags.has_key:
- cert = db.get_cert_from_db(nickname, pem=False)
+ cert = db.get_cert_from_db(nickname)
if cert:
ca_certs.append((cert, nickname, trust_flags))
diff --git a/ipaserver/install/service.py b/ipaserver/install/service.py
index 49cf0223c..d2c3bbd5b 100644
--- a/ipaserver/install/service.py
+++ b/ipaserver/install/service.py
@@ -32,7 +32,7 @@ from ipalib.install import certstore, sysrestore
from ipapython import ipautil
from ipapython.dn import DN
from ipapython import kerberos
-from ipalib import api, errors
+from ipalib import api, errors, x509
from ipaplatform import services
from ipaplatform.paths import paths
@@ -245,7 +245,7 @@ class Service(object):
self.suffix = DN()
self.service_prefix = service_prefix
self.keytab = keytab
- self.dercert = None
+ self.cert = None
self.api = api
self.service_user = service_user
self.keytab_user = service_user
@@ -370,7 +370,8 @@ class Service(object):
dn = DN(('krbprincipalname', self.principal), ('cn', 'services'),
('cn', 'accounts'), self.suffix)
entry = api.Backend.ldap2.get_entry(dn)
- entry.setdefault('userCertificate', []).append(self.dercert)
+ entry.setdefault('userCertificate', []).append(
+ self.cert.public_bytes(x509.Encoding.DER))
try:
api.Backend.ldap2.update_entry(entry)
except Exception as e:
diff --git a/ipaserver/plugins/ca.py b/ipaserver/plugins/ca.py
index 8db6ec549..6557ce2a0 100644
--- a/ipaserver/plugins/ca.py
+++ b/ipaserver/plugins/ca.py
@@ -181,8 +181,8 @@ def set_certificate_attrs(entry, options, want_cert=True):
if want_chain or full:
pkcs7_der = ca_api.read_ca_chain(ca_id)
- pems = x509.pkcs7_to_pems(pkcs7_der, x509.DER)
- ders = [x509.normalize_certificate(pem) for pem in pems]
+ certs = x509.pkcs7_to_certs(pkcs7_der, x509.DER)
+ ders = [cert.public_bytes(x509.Encoding.DER) for cert in certs]
entry['certificate_chain'] = ders
diff --git a/ipaserver/plugins/cert.py b/ipaserver/plugins/cert.py
index b2c5af82e..1b2991bd0 100644
--- a/ipaserver/plugins/cert.py
+++ b/ipaserver/plugins/cert.py
@@ -490,7 +490,8 @@ class BaseCertObject(Object):
"""
if 'certificate' in obj:
- cert = x509.load_pem_x509_certificate(obj['certificate'])
+ cert = x509.load_der_x509_certificate(
+ base64.b64decode(obj['certificate']))
obj['subject'] = DN(cert.subject)
obj['issuer'] = DN(cert.issuer)
obj['serial_number'] = cert.serial_number
@@ -505,7 +506,7 @@ class BaseCertObject(Object):
cert.fingerprint(hashes.SHA256()))
general_names = x509.process_othernames(
- x509.get_san_general_names(cert))
+ cert.san_general_names)
for gn in general_names:
try:
@@ -911,7 +912,7 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
profile = api.Command['certprofile_show'](profile_id)
store = profile['result']['ipacertprofilestoreissued'][0] == 'TRUE'
if store and 'certificate' in result:
- cert = str(result.get('certificate'))
+ cert = result.get('certificate')
kwargs = dict(addattr=u'usercertificate={}'.format(cert))
# note: we call different commands for the different
# principal types because handling of 'userCertificate'
@@ -927,7 +928,8 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
"used for krbtgt certificates")
if 'certificate_chain' in ca_obj:
- cert = x509.load_pem_x509_certificate(result['certificate'])
+ cert = x509.load_der_x509_certificate(
+ base64.b64decode(result['certificate']))
cert = cert.public_bytes(serialization.Encoding.DER)
result['certificate_chain'] = [cert] + ca_obj['certificate_chain']
@@ -1191,7 +1193,8 @@ class cert_show(Retrieve, CertMethod, VirtualCommand):
# we don't tell Dogtag the issuer (but we check the cert after).
#
result = self.Backend.ra.get_certificate(str(serial_number))
- cert = x509.load_pem_x509_certificate(result['certificate'])
+ cert = x509.load_der_x509_certificate(
+ base64.b64decode(result['certificate']))
try:
self.check_access()
diff --git a/ipaserver/plugins/certmap.py b/ipaserver/plugins/certmap.py
index ee8938c72..f26bf672a 100644
--- a/ipaserver/plugins/certmap.py
+++ b/ipaserver/plugins/certmap.py
@@ -17,7 +17,6 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-import base64
import logging
import dbus
@@ -439,12 +438,14 @@ class _sssd(object):
:raise RemoteRetrieveError: if DBus error occurs
"""
try:
- pem = x509.make_pem(base64.b64encode(cert))
+ cert_obj = x509.load_der_x509_certificate(cert)
# bug 3306 in sssd returns 0 entry when max_entries = 0
# Temp workaround is to use a non-null value, not too high
# to avoid reserving unneeded memory
max_entries = dbus.UInt32(100)
- user_paths = self._users_iface.ListByCertificate(pem, max_entries)
+ user_paths = self._users_iface.ListByCertificate(
+ cert_obj.public_bytes(x509.Encoding.PEM),
+ max_entries)
users = dict()
for user_path in user_paths:
user_obj = self._bus.get_object(DBUS_SSSD_NAME, user_path)
diff --git a/ipatests/test_ipalib/test_x509.py b/ipatests/test_ipalib/test_x509.py
index 332518013..904152aac 100644
--- a/ipatests/test_ipalib/test_x509.py
+++ b/ipatests/test_ipalib/test_x509.py
@@ -81,11 +81,11 @@ class test_x509(object):
# Load a good cert with headers and leading text
newcert = (
- 'leading text\n' + goodcert_headers)
+ b'leading text\n' + goodcert_headers)
x509.load_pem_x509_certificate(newcert)
# Load a good cert with bad headers
- newcert = '-----BEGIN CERTIFICATE-----' + goodcert_headers
+ newcert = b'-----BEGIN CERTIFICATE-----' + goodcert_headers
with pytest.raises((TypeError, ValueError)):
x509.load_pem_x509_certificate(newcert)
diff --git a/ipatests/test_ipaserver/test_ldap.py b/ipatests/test_ipaserver/test_ldap.py
index a9e4256cb..88638dea2 100644
--- a/ipatests/test_ipaserver/test_ldap.py
+++ b/ipatests/test_ipaserver/test_ldap.py
@@ -78,9 +78,8 @@ class test_ldap(object):
self.conn.connect(autobind=AUTOBIND_DISABLED)
entry_attrs = self.conn.get_entry(self.dn, ['usercertificate'])
cert = entry_attrs.get('usercertificate')
- cert = cert[0]
- serial = x509.load_certificate(cert, x509.DER).serial_number
- assert serial is not None
+ cert = x509.load_der_x509_certificate(cert[0])
+ assert cert.serial_number is not None
def test_simple(self):
"""
@@ -96,9 +95,8 @@ class test_ldap(object):
self.conn.connect(bind_dn=DN(('cn', 'directory manager')), bind_pw=dm_password)
entry_attrs = self.conn.get_entry(self.dn, ['usercertificate'])
cert = entry_attrs.get('usercertificate')
- cert = cert[0]
- serial = x509.load_certificate(cert, x509.DER).serial_number
- assert serial is not None
+ cert = x509.load_der_x509_certificate(cert[0])
+ assert cert.serial_number is not None
def test_Backend(self):
"""
@@ -123,9 +121,8 @@ class test_ldap(object):
result = myapi.Command['service_show']('ldap/%s@%s' % (api.env.host, api.env.realm,))
entry_attrs = result['result']
cert = entry_attrs.get('usercertificate')
- cert = cert[0]
- serial = x509.load_certificate(cert, x509.DER).serial_number
- assert serial is not None
+ cert = x509.load_der_x509_certificate(cert[0])
+ assert cert.serial_number is not None
def test_autobind(self):
"""
@@ -138,9 +135,8 @@ class test_ldap(object):
raise nose.SkipTest("Only executed as root")
entry_attrs = self.conn.get_entry(self.dn, ['usercertificate'])
cert = entry_attrs.get('usercertificate')
- cert = cert[0]
- serial = x509.load_certificate(cert, x509.DER).serial_number
- assert serial is not None
+ cert = x509.load_der_x509_certificate(cert[0])
+ assert cert.serial_number is not None
@pytest.mark.tier0
diff --git a/ipatests/test_xmlrpc/testcert.py b/ipatests/test_xmlrpc/testcert.py
index c27ea95b0..151919180 100644
--- a/ipatests/test_xmlrpc/testcert.py
+++ b/ipatests/test_xmlrpc/testcert.py
@@ -29,6 +29,7 @@ import os
import tempfile
import shutil
import six
+import base64
from ipalib import api, x509
from ipaserver.plugins import rabase
@@ -96,4 +97,6 @@ def makecert(reqdir, subject, principal):
csr = unicode(generate_csr(reqdir, pwname, str(subject)))
res = api.Command['cert_request'](csr, principal=principal, add=True)
- return x509.make_pem(res['result']['certificate'])
+ cert = x509.load_der_x509_certificate(
+ base64.b64decode(res['result']['certificate']))
+ return cert.public_bytes(x509.Encoding.PEM).decode('utf-8')