summaryrefslogtreecommitdiffstats
path: root/ipalib/certstore.py
diff options
context:
space:
mode:
authorJan Cholasta <jcholast@redhat.com>2014-06-10 14:21:58 +0200
committerPetr Viktorin <pviktori@redhat.com>2014-07-30 16:04:21 +0200
commitde695e688e390deef5510dca0daef133f50f490f (patch)
treefd626a1fd6a396ee8ec5cd6839740b57c947af70 /ipalib/certstore.py
parent239ef955af58c0a3e167dc85e3f2a0168a91989c (diff)
downloadfreeipa-de695e688e390deef5510dca0daef133f50f490f.tar.gz
freeipa-de695e688e390deef5510dca0daef133f50f490f.tar.xz
freeipa-de695e688e390deef5510dca0daef133f50f490f.zip
Add certificate store module ipalib.certstore.
Part of https://fedorahosted.org/freeipa/ticket/3259 Part of https://fedorahosted.org/freeipa/ticket/3520 Reviewed-By: Rob Crittenden <rcritten@redhat.com>
Diffstat (limited to 'ipalib/certstore.py')
-rw-r--r--ipalib/certstore.py397
1 files changed, 397 insertions, 0 deletions
diff --git a/ipalib/certstore.py b/ipalib/certstore.py
new file mode 100644
index 000000000..8a9b41015
--- /dev/null
+++ b/ipalib/certstore.py
@@ -0,0 +1,397 @@
+# Authors:
+# Jan Cholasta <jcholast@redhat.com>
+#
+# Copyright (C) 2014 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+"""
+LDAP shared certificate store.
+"""
+
+from nss.error import NSPRError
+from pyasn1.error import PyAsn1Error
+
+from ipapython.dn import DN
+from ipapython.certdb import get_ca_nickname
+from ipalib import errors, x509
+
+
+def _parse_cert(dercert):
+ try:
+ subject = x509.get_subject(dercert, x509.DER)
+ issuer = x509.get_issuer(dercert, x509.DER)
+ serial_number = x509.get_serial_number(dercert, x509.DER)
+ public_key_info = x509.get_der_public_key_info(dercert, x509.DER)
+ except (NSPRError, PyAsn1Error), e:
+ raise ValueError("failed to decode certificate: %s" % e)
+
+ subject = str(subject).replace('\\;', '\\3b')
+ issuer = str(issuer).replace('\\;', '\\3b')
+ issuer_serial = '%s;%s' % (issuer, serial_number)
+
+ return subject, issuer_serial, public_key_info
+
+
+def init_ca_entry(entry, dercert, nickname, trusted, ext_key_usage):
+ """
+ Initialize certificate store entry for a CA certificate.
+ """
+ subject, issuer_serial, public_key = _parse_cert(dercert)
+
+ if ext_key_usage is not None:
+ try:
+ cert_eku = x509.get_ext_key_usage(dercert, x509.DER)
+ except NSPRError, e:
+ raise ValueError("failed to decode certificate: %s" % e)
+ if cert_eku is not None:
+ cert_eku -= {x509.EKU_SERVER_AUTH, x509.EKU_CLIENT_AUTH,
+ x509.EKU_EMAIL_PROTECTION, x509.EKU_CODE_SIGNING,
+ x509.EKU_ANY, x509.EKU_PLACEHOLDER}
+ ext_key_usage = ext_key_usage | cert_eku
+
+ entry['objectClass'] = ['ipaCertificate', 'pkiCA', 'ipaKeyPolicy']
+ entry['cn'] = [nickname]
+
+ entry['ipaCertSubject'] = [subject]
+ entry['ipaCertIssuerSerial'] = [issuer_serial]
+ entry['ipaPublicKey'] = [public_key]
+ entry['cACertificate;binary'] = [dercert]
+
+ if trusted is not None:
+ entry['ipaKeyTrust'] = ['trusted' if trusted else 'distrusted']
+ if ext_key_usage is not None:
+ ext_key_usage = list(ext_key_usage)
+ if not ext_key_usage:
+ ext_key_usage.append(x509.EKU_PLACEHOLDER)
+ entry['ipaKeyExtUsage'] = ext_key_usage
+
+
+def update_compat_ca(ldap, base_dn, dercert):
+ """
+ Update the CA certificate in cn=CAcert,cn=ipa,cn=etc,SUFFIX.
+ """
+ dn = DN(('cn', 'CAcert'), ('cn', 'ipa'), ('cn', 'etc'), base_dn)
+ try:
+ entry = ldap.get_entry(dn, attrs_list=['cACertificate;binary'])
+ entry.single_value['cACertificate;binary'] = dercert
+ ldap.update_entry(entry)
+ except errors.NotFound:
+ entry = ldap.make_entry(dn)
+ entry['objectClass'] = ['nsContainer', 'pkiCA']
+ entry.single_value['cn'] = 'CAcert'
+ entry.single_value['cACertificate;binary'] = dercert
+ ldap.add_entry(entry)
+ except errors.EmptyModlist:
+ pass
+
+
+def clean_old_config(ldap, base_dn, dn, config_ipa, config_compat):
+ """
+ Remove ipaCA and compatCA flags from their previous carriers.
+ """
+ if not config_ipa and not config_compat:
+ return
+
+ try:
+ result, truncated = ldap.find_entries(
+ base_dn=DN(('cn', 'certificates'), ('cn', 'ipa'), ('cn', 'etc'),
+ base_dn),
+ filter='(|(ipaConfigString=ipaCA)(ipaConfigString=compatCA))',
+ attrs_list=['ipaConfigString'])
+ except errors.NotFound:
+ return
+
+ for entry in result:
+ if entry.dn == dn:
+ continue
+ for config in list(entry['ipaConfigString']):
+ if config.lower() == 'ipaca' and config_ipa:
+ entry['ipaConfigString'].remove(config)
+ elif config.lower() == 'compatca' and config_compat:
+ entry['ipaConfigString'].remove(config)
+ try:
+ ldap.update_entry(entry)
+ except errors.EmptyModlist:
+ pass
+
+
+def add_ca_cert(ldap, base_dn, dercert, nickname, trusted=None,
+ ext_key_usage=None, config_ipa=False, config_compat=False):
+ """
+ Add new entry for a CA certificate to the certificate store.
+ """
+ container_dn = DN(('cn', 'certificates'), ('cn', 'ipa'), ('cn', 'etc'),
+ base_dn)
+ dn = DN(('cn', nickname), container_dn)
+ entry = ldap.make_entry(dn)
+
+ init_ca_entry(entry, dercert, nickname, trusted, ext_key_usage)
+
+ if config_ipa:
+ entry.setdefault('ipaConfigString', []).append('ipaCA')
+ if config_compat:
+ entry.setdefault('ipaConfigString', []).append('compatCA')
+
+ if config_compat:
+ update_compat_ca(ldap, base_dn, dercert)
+
+ ldap.add_entry(entry)
+ clean_old_config(ldap, base_dn, dn, config_ipa, config_compat)
+
+
+def update_ca_cert(ldap, base_dn, dercert, trusted=None, ext_key_usage=None,
+ config_ipa=False, config_compat=False):
+ """
+ Update existing entry for a CA certificate in the certificate store.
+ """
+ subject, issuer_serial, public_key = _parse_cert(dercert)
+
+ filter = ldap.make_filter({'ipaCertSubject': subject})
+ result, truncated = ldap.find_entries(
+ base_dn=DN(('cn', 'certificates'), ('cn', 'ipa'), ('cn', 'etc'),
+ base_dn),
+ filter=filter,
+ attrs_list=['cn', 'ipaCertSubject', 'ipaCertIssuerSerial',
+ 'ipaPublicKey', 'ipaKeyTrust', 'ipaKeyExtUsage',
+ 'ipaConfigString', 'cACertificate;binary'])
+ entry = result[0]
+ dn = entry.dn
+
+ for old_cert in entry['cACertificate;binary']:
+ # Check if we are adding a new cert
+ if old_cert == dercert:
+ break
+ else:
+ # We are adding a new cert, validate it
+ if entry.single_value['ipaCertSubject'].lower() != subject.lower():
+ raise ValueError("subject name mismatch")
+ if entry.single_value['ipaPublicKey'] != public_key:
+ raise ValueError("subject public key info mismatch")
+ entry['ipaCertIssuerSerial'].append(issuer_serial)
+ entry['cACertificate;binary'].append(dercert)
+
+ # Update key trust
+ if trusted is not None:
+ old_trust = entry.single_value.get('ipaKeyTrust')
+ new_trust = 'trusted' if trusted else 'distrusted'
+ if old_trust is not None and old_trust.lower() != new_trust:
+ raise ValueError("inconsistent trust")
+ entry.single_value['ipaKeyTrust'] = new_trust
+
+ # Update extended key usage
+ if trusted is not False:
+ if ext_key_usage is not None:
+ old_eku = set(entry.get('ipaKeyExtUsage', []))
+ old_eku.discard(x509.EKU_PLACEHOLDER)
+ new_eku = old_eku | ext_key_usage
+ if not new_eku:
+ new_eku.add(x509.EKU_PLACEHOLDER)
+ entry['ipaKeyExtUsage'] = list(new_eku)
+ else:
+ entry.pop('ipaKeyExtUsage', None)
+
+ # Update configuration flags
+ is_ipa = False
+ is_compat = False
+ for config in entry.get('ipaConfigString', []):
+ if config.lower() == 'ipaca':
+ is_ipa = True
+ elif config.lower() == 'compatca':
+ is_compat = True
+ if config_ipa and not is_ipa:
+ entry.setdefault('ipaConfigString', []).append('ipaCA')
+ if config_compat and not is_compat:
+ entry.setdefault('ipaConfigString', []).append('compatCA')
+
+ if is_compat or config_compat:
+ update_compat_ca(ldap, base_dn, dercert)
+
+ ldap.update_entry(entry)
+ clean_old_config(ldap, base_dn, dn, config_ipa, config_compat)
+
+
+def put_ca_cert(ldap, base_dn, dercert, nickname, trusted=None,
+ ext_key_usage=None, config_ipa=False, config_compat=False):
+ """
+ Add or update entry for a CA certificate in the certificate store.
+ """
+ try:
+ update_ca_cert(ldap, base_dn, dercert, trusted, ext_key_usage,
+ config_ipa=config_ipa, config_compat=config_compat)
+ except errors.NotFound:
+ add_ca_cert(ldap, base_dn, dercert, nickname, trusted, ext_key_usage,
+ config_ipa=config_ipa, config_compat=config_compat)
+ except errors.EmptyModlist:
+ pass
+
+
+def get_ca_certs(ldap, base_dn, compat_realm, compat_ipa_ca,
+ filter_subject=None):
+ """
+ Get CA certificates and associated key policy from the certificate store.
+ """
+ if filter_subject is not None:
+ if not isinstance(filter_subject, list):
+ filter_subject = [filter_subject]
+ filter_subject = [str(subj).replace('\\;', '\\3b')
+ for subj in filter_subject]
+
+ config_dn = DN(('cn', 'ipa'), ('cn', 'etc'), base_dn)
+ container_dn = DN(('cn', 'certificates'), config_dn)
+ try:
+ # Search the certificate store for CA certificate entries
+ filters = ['(objectClass=ipaCertificate)', '(objectClass=pkiCA)']
+ if filter_subject:
+ filter = ldap.make_filter({'ipaCertSubject': filter_subject})
+ filters.append(filter)
+ result, truncated = ldap.find_entries(
+ base_dn=container_dn,
+ filter=ldap.combine_filters(filters, ldap.MATCH_ALL),
+ attrs_list=['cn', 'ipaCertSubject', 'ipaCertIssuerSerial',
+ 'ipaPublicKey', 'ipaKeyTrust', 'ipaKeyExtUsage',
+ 'cACertificate;binary'])
+
+ certs = []
+ for entry in result:
+ nickname = entry.single_value['cn']
+ trusted = entry.single_value.get('ipaKeyTrust', 'unknown').lower()
+ if trusted == 'trusted':
+ trusted = True
+ elif trusted == 'distrusted':
+ trusted = False
+ else:
+ trusted = None
+ ext_key_usage = entry.get('ipaKeyExtUsage')
+ if ext_key_usage is not None:
+ ext_key_usage = set(str(p) for p in ext_key_usage)
+ ext_key_usage.discard(x509.EKU_PLACEHOLDER)
+
+ for cert in entry.get('cACertificate;binary', []):
+ certs.append((cert, nickname, trusted, ext_key_usage))
+
+ return certs
+ except errors.NotFound:
+ try:
+ ldap.get_entry(container_dn, [''])
+ except errors.NotFound:
+ pass
+ else:
+ return []
+
+ # Fallback to cn=CAcert,cn=ipa,cn=etc,SUFFIX
+ dn = DN(('cn', 'CAcert'), config_dn)
+ entry = ldap.get_entry(dn, ['cACertificate;binary'])
+
+ cert = entry.single_value['cACertificate;binary']
+ subject, issuer_serial, public_key_info = _parse_cert(cert)
+ if filter_subject is not None and subject not in filter_subject:
+ return []
+
+ nickname = get_ca_nickname(compat_realm)
+ ext_key_usage = {x509.EKU_SERVER_AUTH}
+ if compat_ipa_ca:
+ ext_key_usage |= {x509.EKU_CLIENT_AUTH,
+ x509.EKU_EMAIL_PROTECTION,
+ x509.EKU_CODE_SIGNING}
+
+ return [(cert, nickname, True, ext_key_usage)]
+
+
+def trust_flags_to_key_policy(trust_flags):
+ """
+ Convert certutil trust flags to certificate store key policy.
+ """
+ if 'p' in trust_flags:
+ if 'C' in trust_flags or 'P' in trust_flags or 'T' in trust_flags:
+ raise ValueError("cannot be both trusted and not trusted")
+ return False, None, None
+ elif 'C' in trust_flags or 'T' in trust_flags:
+ if 'P' in trust_flags:
+ raise ValueError("cannot be both CA and not CA")
+ ca = True
+ elif 'P' in trust_flags:
+ ca = False
+ else:
+ return None, None, set()
+
+ trust_flags = trust_flags.split(',')
+ ext_key_usage = set()
+ for i, kp in enumerate((x509.EKU_SERVER_AUTH,
+ x509.EKU_EMAIL_PROTECTION,
+ x509.EKU_CODE_SIGNING)):
+ if 'C' in trust_flags[i] or 'P' in trust_flags[i]:
+ ext_key_usage.add(kp)
+ if 'T' in trust_flags[0]:
+ ext_key_usage.add(x509.EKU_CLIENT_AUTH)
+
+ return True, ca, ext_key_usage
+
+
+def key_policy_to_trust_flags(trusted, ca, ext_key_usage):
+ """
+ Convert certificate store key policy to certutil trust flags.
+ """
+ if trusted is False:
+ return 'p,p,p'
+ elif trusted is None or ca is None:
+ return ',,'
+ elif ext_key_usage is None:
+ if ca:
+ return 'CT,C,C'
+ else:
+ return 'P,P,P'
+
+ trust_flags = ['', '', '']
+ for i, kp in enumerate((x509.EKU_SERVER_AUTH,
+ x509.EKU_EMAIL_PROTECTION,
+ x509.EKU_CODE_SIGNING)):
+ if kp in ext_key_usage:
+ trust_flags[i] += ('C' if ca else 'P')
+ if ca and x509.EKU_CLIENT_AUTH in ext_key_usage:
+ trust_flags[0] += 'T'
+
+ trust_flags = ','.join(trust_flags)
+ return trust_flags
+
+
+def put_ca_cert_nss(ldap, base_dn, dercert, nickname, trust_flags,
+ config_ipa=False, config_compat=False):
+ """
+ Add or update entry for a CA certificate in the certificate store.
+ """
+ trusted, ca, ext_key_usage = trust_flags_to_key_policy(trust_flags)
+ if ca is False:
+ raise ValueError("must be CA certificate")
+
+ put_ca_cert(ldap, base_dn, dercert, nickname, trusted, ext_key_usage,
+ config_ipa, config_compat)
+
+
+def get_ca_certs_nss(ldap, base_dn, compat_realm, compat_ipa_ca,
+ filter_subject=None):
+ """
+ Get CA certificates and associated trust flags from the certificate store.
+ """
+ nss_certs = []
+
+ certs = get_ca_certs(ldap, base_dn, compat_realm, compat_ipa_ca,
+ filter_subject=filter_subject)
+ for cert, nickname, trusted, ext_key_usage in certs:
+ trust_flags = key_policy_to_trust_flags(trusted, True, ext_key_usage)
+ nss_certs.append((cert, nickname, trust_flags))
+
+ return nss_certs