summaryrefslogtreecommitdiffstats
path: root/ipapython/dnssec/ldapkeydb.py
diff options
context:
space:
mode:
Diffstat (limited to 'ipapython/dnssec/ldapkeydb.py')
-rw-r--r--ipapython/dnssec/ldapkeydb.py450
1 files changed, 0 insertions, 450 deletions
diff --git a/ipapython/dnssec/ldapkeydb.py b/ipapython/dnssec/ldapkeydb.py
deleted file mode 100644
index aa0413934..000000000
--- a/ipapython/dnssec/ldapkeydb.py
+++ /dev/null
@@ -1,450 +0,0 @@
-#
-# Copyright (C) 2014 FreeIPA Contributors see COPYING for license
-#
-
-from __future__ import print_function
-
-from binascii import hexlify
-import collections
-from pprint import pprint
-
-import ipalib
-from ipapython.dn import DN
-from ipapython import ipaldap
-from ipapython import ipa_log_manager
-
-from ipapython.dnssec.abshsm import (
- attrs_name2id,
- AbstractHSM,
- bool_attr_names,
- populate_pkcs11_metadata)
-from ipapython import p11helper as _ipap11helper
-import uuid
-
-def uri_escape(val):
- """convert val to %-notation suitable for ID component in URI"""
- assert len(val) > 0, "zero-length URI component detected"
- hexval = hexlify(val)
- out = '%'
- # pylint: disable=E1127
- out += '%'.join(hexval[i:i+2] for i in range(0, len(hexval), 2))
- return out
-
-def ldap_bool(val):
- if val == 'TRUE' or val is True:
- return True
- elif val == 'FALSE' or val is False:
- return False
- else:
- raise AssertionError('invalid LDAP boolean "%s"' % val)
-
-def get_default_attrs(object_classes):
- # object class -> default attribute values mapping
- defaults = {
- u'ipk11publickey': {
- 'ipk11copyable': True,
- 'ipk11derive': False,
- 'ipk11encrypt': False,
- 'ipk11local': True,
- 'ipk11modifiable': True,
- 'ipk11private': True,
- 'ipk11trusted': False,
- 'ipk11verify': True,
- 'ipk11verifyrecover': True,
- 'ipk11wrap': False
- },
- u'ipk11privatekey': {
- 'ipk11alwaysauthenticate': False,
- 'ipk11alwayssensitive': True,
- 'ipk11copyable': True,
- 'ipk11decrypt': False,
- 'ipk11derive': False,
- 'ipk11extractable': True,
- 'ipk11local': True,
- 'ipk11modifiable': True,
- 'ipk11neverextractable': False,
- 'ipk11private': True,
- 'ipk11sensitive': True,
- 'ipk11sign': True,
- 'ipk11signrecover': True,
- 'ipk11unwrap': False,
- 'ipk11wrapwithtrusted': False
- },
- u'ipk11secretkey': {
- 'ipk11alwaysauthenticate': False,
- 'ipk11alwayssensitive': True,
- 'ipk11copyable': True,
- 'ipk11decrypt': False,
- 'ipk11derive': False,
- 'ipk11encrypt': False,
- 'ipk11extractable': True,
- 'ipk11local': True,
- 'ipk11modifiable': True,
- 'ipk11neverextractable': False,
- 'ipk11private': True,
- 'ipk11sensitive': True,
- 'ipk11sign': False,
- 'ipk11trusted': False,
- 'ipk11unwrap': True,
- 'ipk11verify': False,
- 'ipk11wrap': True,
- 'ipk11wrapwithtrusted': False
- }
- }
-
- # get set of supported object classes
- present_clss = set()
- for cls in object_classes:
- present_clss.add(cls.lower())
- present_clss.intersection_update(set(defaults.keys()))
- if len(present_clss) <= 0:
- raise AssertionError('none of "%s" object classes are supported' %
- object_classes)
-
- result = {}
- for cls in present_clss:
- result.update(defaults[cls])
- return result
-
-
-class Key(collections.MutableMapping):
- """abstraction to hide LDAP entry weirdnesses:
- - non-normalized attribute names
- - boolean attributes returned as strings
- - planned entry deletion prevents subsequent use of the instance
- """
- def __init__(self, entry, ldap, ldapkeydb):
- self.entry = entry
- self._delentry = None # indicates that object was deleted
- self.ldap = ldap
- self.ldapkeydb = ldapkeydb
- self.log = ldap.log.getChild(__name__)
-
- def __assert_not_deleted(self):
- assert self.entry and not self._delentry, (
- "attempt to use to-be-deleted entry %s detected"
- % self._delentry.dn)
-
- def __getitem__(self, key):
- self.__assert_not_deleted()
- val = self.entry.single_value[key]
- if key.lower() in bool_attr_names:
- val = ldap_bool(val)
- return val
-
- def __setitem__(self, key, value):
- self.__assert_not_deleted()
- self.entry[key] = value
-
- def __delitem__(self, key):
- self.__assert_not_deleted()
- del self.entry[key]
-
- def __iter__(self):
- """generates list of ipa names of all PKCS#11 attributes present in the object"""
- self.__assert_not_deleted()
- for ipa_name in list(self.entry.keys()):
- lowercase = ipa_name.lower()
- if lowercase in attrs_name2id:
- yield lowercase
-
- def __len__(self):
- self.__assert_not_deleted()
- return len(self.entry)
-
- def __repr__(self):
- if self._delentry:
- return 'deleted entry: %s' % repr(self._delentry)
-
- sanitized = dict(self.entry)
- for attr in ['ipaPrivateKey', 'ipaPublicKey', 'ipk11publickeyinfo']:
- if attr in sanitized:
- del sanitized[attr]
- return repr(sanitized)
-
- def _cleanup_key(self):
- """remove default values from LDAP entry"""
- default_attrs = get_default_attrs(self.entry['objectclass'])
- empty = object()
- for attr in default_attrs:
- if self.get(attr, empty) == default_attrs[attr]:
- del self[attr]
-
- def _update_key(self):
- """remove default values from LDAP entry and write back changes"""
- if self._delentry:
- self._delete_key()
- return
-
- self._cleanup_key()
-
- try:
- self.ldap.update_entry(self.entry)
- except ipalib.errors.EmptyModlist:
- pass
-
- def _delete_key(self):
- """remove key metadata entry from LDAP
-
- After calling this, the python object is no longer valid and all
- subsequent method calls on it will fail.
- """
- assert not self.entry, (
- "Key._delete_key() called before Key.schedule_deletion()")
- assert self._delentry, "Key._delete_key() called more than once"
- self.log.debug('deleting key id 0x%s DN %s from LDAP',
- hexlify(self._delentry.single_value['ipk11id']),
- self._delentry.dn)
- self.ldap.delete_entry(self._delentry)
- self._delentry = None
- self.ldap = None
- self.ldapkeydb = None
-
- def schedule_deletion(self):
- """schedule key deletion from LDAP
-
- Calling schedule_deletion() will make this object incompatible with
- normal Key. After that the object must not be read or modified.
- Key metadata will be actually deleted when LdapKeyDB.flush() is called.
- """
- assert not self._delentry, (
- "Key.schedule_deletion() called more than once")
- self._delentry = self.entry
- self.entry = None
-
-
-class ReplicaKey(Key):
- # TODO: object class assert
- def __init__(self, entry, ldap, ldapkeydb):
- super(ReplicaKey, self).__init__(entry, ldap, ldapkeydb)
-
-class MasterKey(Key):
- # TODO: object class assert
- def __init__(self, entry, ldap, ldapkeydb):
- super(MasterKey, self).__init__(entry, ldap, ldapkeydb)
-
- @property
- def wrapped_entries(self):
- """LDAP entires with wrapped data
-
- One entry = one blob + ipaWrappingKey pointer to unwrapping key"""
-
- keys = []
- if 'ipaSecretKeyRef' not in self.entry:
- return keys
-
- for dn in self.entry['ipaSecretKeyRef']:
- try:
- obj = self.ldap.get_entry(dn)
- keys.append(obj)
- except ipalib.errors.NotFound:
- continue
-
- return keys
-
- def add_wrapped_data(self, data, wrapping_mech, replica_key_id):
- wrapping_key_uri = 'pkcs11:id=%s;type=public' \
- % uri_escape(replica_key_id)
- # TODO: replace this with 'autogenerate' to prevent collisions
- uuid_rdn = DN('ipk11UniqueId=%s' % uuid.uuid1())
- entry_dn = DN(uuid_rdn, self.ldapkeydb.base_dn)
- entry = self.ldap.make_entry(entry_dn,
- objectClass=['ipaSecretKeyObject', 'ipk11Object'],
- ipaSecretKey=data,
- ipaWrappingKey=wrapping_key_uri,
- ipaWrappingMech=wrapping_mech)
-
- self.log.info('adding master key 0x%s wrapped with replica key 0x%s to %s',
- hexlify(self['ipk11id']),
- hexlify(replica_key_id),
- entry_dn)
- self.ldap.add_entry(entry)
- if 'ipaSecretKeyRef' not in self.entry:
- self.entry['objectClass'] += ['ipaSecretKeyRefObject']
- self.entry.setdefault('ipaSecretKeyRef', []).append(entry_dn)
-
-
-class LdapKeyDB(AbstractHSM):
- def __init__(self, log, ldap, base_dn):
- self.ldap = ldap
- self.base_dn = base_dn
- self.log = log
- self.cache_replica_pubkeys_wrap = None
- self.cache_masterkeys = None
- self.cache_zone_keypairs = None
-
- def _get_key_dict(self, key_type, ldap_filter):
- try:
- objs = self.ldap.get_entries(base_dn=self.base_dn,
- filter=ldap_filter)
- except ipalib.errors.NotFound:
- return {}
-
- keys = {}
- for o in objs:
- # add default values not present in LDAP
- key = key_type(o, self.ldap, self)
- default_attrs = get_default_attrs(key.entry['objectclass'])
- for attr in default_attrs:
- key.setdefault(attr, default_attrs[attr])
-
- assert 'ipk11id' in key, 'key is missing ipk11Id in %s' % key.entry.dn
- key_id = key['ipk11id']
- assert key_id not in keys, 'duplicate ipk11Id=0x%s in "%s" and "%s"' % (hexlify(key_id), key.entry.dn, keys[key_id].entry.dn)
- assert 'ipk11label' in key, 'key "%s" is missing ipk11Label' % key.entry.dn
- assert 'objectclass' in key.entry, 'key "%s" is missing objectClass attribute' % key.entry.dn
-
- keys[key_id] = key
-
- self._update_keys()
- return keys
-
- def _update_keys(self):
- for cache in [self.cache_masterkeys, self.cache_replica_pubkeys_wrap,
- self.cache_zone_keypairs]:
- if cache:
- for key in cache.values():
- key._update_key()
-
- def flush(self):
- """write back content of caches to LDAP"""
- self._update_keys()
- self.cache_masterkeys = None
- self.cache_replica_pubkeys_wrap = None
- self.cache_zone_keypairs = None
-
- def _import_keys_metadata(self, source_keys):
- """import key metadata from Key-compatible objects
-
- metadata from multiple source keys can be imported into single LDAP
- object
-
- :param: source_keys is iterable of (Key object, PKCS#11 object class)"""
-
- entry_dn = DN('ipk11UniqueId=autogenerate', self.base_dn)
- entry = self.ldap.make_entry(entry_dn, objectClass=['ipk11Object'])
- new_key = Key(entry, self.ldap, self)
-
- for source_key, pkcs11_class in source_keys:
- if pkcs11_class == _ipap11helper.KEY_CLASS_SECRET_KEY:
- entry['objectClass'].append('ipk11SecretKey')
- elif pkcs11_class == _ipap11helper.KEY_CLASS_PUBLIC_KEY:
- entry['objectClass'].append('ipk11PublicKey')
- elif pkcs11_class == _ipap11helper.KEY_CLASS_PRIVATE_KEY:
- entry['objectClass'].append('ipk11PrivateKey')
- else:
- raise AssertionError('unsupported object class %s' % pkcs11_class)
-
- populate_pkcs11_metadata(source_key, new_key)
- new_key._cleanup_key()
- return new_key
-
- def import_master_key(self, mkey):
- new_key = self._import_keys_metadata(
- [(mkey, _ipap11helper.KEY_CLASS_SECRET_KEY)])
- self.ldap.add_entry(new_key.entry)
- self.log.debug('imported master key metadata: %s', new_key.entry)
-
- def import_zone_key(self, pubkey, pubkey_data, privkey,
- privkey_wrapped_data, wrapping_mech, master_key_id):
- new_key = self._import_keys_metadata(
- [(pubkey, _ipap11helper.KEY_CLASS_PUBLIC_KEY),
- (privkey, _ipap11helper.KEY_CLASS_PRIVATE_KEY)])
-
- new_key.entry['objectClass'].append('ipaPrivateKeyObject')
- new_key.entry['ipaPrivateKey'] = privkey_wrapped_data
- new_key.entry['ipaWrappingKey'] = 'pkcs11:id=%s;type=secret-key' \
- % uri_escape(master_key_id)
- new_key.entry['ipaWrappingMech'] = wrapping_mech
-
- new_key.entry['objectClass'].append('ipaPublicKeyObject')
- new_key.entry['ipaPublicKey'] = pubkey_data
-
- self.ldap.add_entry(new_key.entry)
- self.log.debug('imported zone key id: 0x%s', hexlify(new_key['ipk11id']))
-
- @property
- def replica_pubkeys_wrap(self):
- if self.cache_replica_pubkeys_wrap:
- return self.cache_replica_pubkeys_wrap
-
- keys = self._filter_replica_keys(
- self._get_key_dict(ReplicaKey,
- '(&(objectClass=ipk11PublicKey)(ipk11Wrap=TRUE)(objectClass=ipaPublicKeyObject))'))
-
- self.cache_replica_pubkeys_wrap = keys
- return keys
-
- @property
- def master_keys(self):
- if self.cache_masterkeys:
- return self.cache_masterkeys
-
- keys = self._get_key_dict(MasterKey,
- '(&(objectClass=ipk11SecretKey)(|(ipk11UnWrap=TRUE)(!(ipk11UnWrap=*)))(ipk11Label=dnssec-master))')
- for key in keys.values():
- prefix = 'dnssec-master'
- assert key['ipk11label'] == prefix, \
- 'secret key dn="%s" ipk11id=0x%s ipk11label="%s" with ipk11UnWrap = TRUE does not have '\
- '"%s" key label' % (
- key.entry.dn,
- hexlify(key['ipk11id']),
- str(key['ipk11label']),
- prefix)
-
- self.cache_masterkeys = keys
- return keys
-
- @property
- def zone_keypairs(self):
- if self.cache_zone_keypairs:
- return self.cache_zone_keypairs
-
- self.cache_zone_keypairs = self._filter_zone_keys(
- self._get_key_dict(Key,
- '(&(objectClass=ipk11PrivateKey)(objectClass=ipaPrivateKeyObject)(objectClass=ipk11PublicKey)(objectClass=ipaPublicKeyObject))'))
-
- return self.cache_zone_keypairs
-
-if __name__ == '__main__':
- # this is debugging mode
- # print information we think are useful to stdout
- # other garbage goes via logger to stderr
- ipa_log_manager.standard_logging_setup(debug=True)
- log = ipa_log_manager.root_logger
-
- # IPA framework initialization
- ipalib.api.bootstrap(in_server=True, log=None) # no logging to file
- ipalib.api.finalize()
-
- # LDAP initialization
- dns_dn = DN(ipalib.api.env.container_dns, ipalib.api.env.basedn)
- ldap = ipaldap.LDAPClient(ipalib.api.env.ldap_uri)
- log.debug('Connecting to LDAP')
- # GSSAPI will be used, used has to be kinited already
- ldap.gssapi_bind()
- log.debug('Connected')
-
- ldapkeydb = LdapKeyDB(log, ldap, DN(('cn', 'keys'), ('cn', 'sec'),
- ipalib.api.env.container_dns,
- ipalib.api.env.basedn))
-
- print('replica public keys: CKA_WRAP = TRUE')
- print('====================================')
- for pubkey_id, pubkey in ldapkeydb.replica_pubkeys_wrap.items():
- print(hexlify(pubkey_id))
- pprint(pubkey)
-
- print('')
- print('master keys')
- print('===========')
- for mkey_id, mkey in ldapkeydb.master_keys.items():
- print(hexlify(mkey_id))
- pprint(mkey)
-
- print('')
- print('zone key pairs')
- print('==============')
- for key_id, key in ldapkeydb.zone_keypairs.items():
- print(hexlify(key_id))
- pprint(key)