summaryrefslogtreecommitdiffstats
path: root/ipapython/dnssec/ldapkeydb.py
diff options
context:
space:
mode:
authorJan Cholasta <jcholast@redhat.com>2016-11-22 17:55:10 +0100
committerMartin Basti <mbasti@redhat.com>2016-11-29 14:50:51 +0100
commita1f260d021bf5d018e634438fde6b7c81ebbbcef (patch)
tree29f979f42913f804514f92ada0cb953fdb6d095f /ipapython/dnssec/ldapkeydb.py
parent8e5d2c7014ff6371a3b306e666c301aea1f7a488 (diff)
downloadfreeipa-a1f260d021bf5d018e634438fde6b7c81ebbbcef.tar.gz
freeipa-a1f260d021bf5d018e634438fde6b7c81ebbbcef.tar.xz
freeipa-a1f260d021bf5d018e634438fde6b7c81ebbbcef.zip
ipapython: move dnssec, p11helper and secrets to ipaserver
The dnssec and secrets subpackages and the p11helper module depend on ipaplatform. Move them to ipaserver as they are used only on the server. https://fedorahosted.org/freeipa/ticket/6474 Reviewed-By: Stanislav Laznicka <slaznick@redhat.com>
Diffstat (limited to 'ipapython/dnssec/ldapkeydb.py')
-rw-r--r--ipapython/dnssec/ldapkeydb.py450
1 files changed, 0 insertions, 450 deletions
diff --git a/ipapython/dnssec/ldapkeydb.py b/ipapython/dnssec/ldapkeydb.py
deleted file mode 100644
index aa0413934..000000000
--- a/ipapython/dnssec/ldapkeydb.py
+++ /dev/null
@@ -1,450 +0,0 @@
-#
-# Copyright (C) 2014 FreeIPA Contributors see COPYING for license
-#
-
-from __future__ import print_function
-
-from binascii import hexlify
-import collections
-from pprint import pprint
-
-import ipalib
-from ipapython.dn import DN
-from ipapython import ipaldap
-from ipapython import ipa_log_manager
-
-from ipapython.dnssec.abshsm import (
- attrs_name2id,
- AbstractHSM,
- bool_attr_names,
- populate_pkcs11_metadata)
-from ipapython import p11helper as _ipap11helper
-import uuid
-
-def uri_escape(val):
- """convert val to %-notation suitable for ID component in URI"""
- assert len(val) > 0, "zero-length URI component detected"
- hexval = hexlify(val)
- out = '%'
- # pylint: disable=E1127
- out += '%'.join(hexval[i:i+2] for i in range(0, len(hexval), 2))
- return out
-
-def ldap_bool(val):
- if val == 'TRUE' or val is True:
- return True
- elif val == 'FALSE' or val is False:
- return False
- else:
- raise AssertionError('invalid LDAP boolean "%s"' % val)
-
-def get_default_attrs(object_classes):
- # object class -> default attribute values mapping
- defaults = {
- u'ipk11publickey': {
- 'ipk11copyable': True,
- 'ipk11derive': False,
- 'ipk11encrypt': False,
- 'ipk11local': True,
- 'ipk11modifiable': True,
- 'ipk11private': True,
- 'ipk11trusted': False,
- 'ipk11verify': True,
- 'ipk11verifyrecover': True,
- 'ipk11wrap': False
- },
- u'ipk11privatekey': {
- 'ipk11alwaysauthenticate': False,
- 'ipk11alwayssensitive': True,
- 'ipk11copyable': True,
- 'ipk11decrypt': False,
- 'ipk11derive': False,
- 'ipk11extractable': True,
- 'ipk11local': True,
- 'ipk11modifiable': True,
- 'ipk11neverextractable': False,
- 'ipk11private': True,
- 'ipk11sensitive': True,
- 'ipk11sign': True,
- 'ipk11signrecover': True,
- 'ipk11unwrap': False,
- 'ipk11wrapwithtrusted': False
- },
- u'ipk11secretkey': {
- 'ipk11alwaysauthenticate': False,
- 'ipk11alwayssensitive': True,
- 'ipk11copyable': True,
- 'ipk11decrypt': False,
- 'ipk11derive': False,
- 'ipk11encrypt': False,
- 'ipk11extractable': True,
- 'ipk11local': True,
- 'ipk11modifiable': True,
- 'ipk11neverextractable': False,
- 'ipk11private': True,
- 'ipk11sensitive': True,
- 'ipk11sign': False,
- 'ipk11trusted': False,
- 'ipk11unwrap': True,
- 'ipk11verify': False,
- 'ipk11wrap': True,
- 'ipk11wrapwithtrusted': False
- }
- }
-
- # get set of supported object classes
- present_clss = set()
- for cls in object_classes:
- present_clss.add(cls.lower())
- present_clss.intersection_update(set(defaults.keys()))
- if len(present_clss) <= 0:
- raise AssertionError('none of "%s" object classes are supported' %
- object_classes)
-
- result = {}
- for cls in present_clss:
- result.update(defaults[cls])
- return result
-
-
-class Key(collections.MutableMapping):
- """abstraction to hide LDAP entry weirdnesses:
- - non-normalized attribute names
- - boolean attributes returned as strings
- - planned entry deletion prevents subsequent use of the instance
- """
- def __init__(self, entry, ldap, ldapkeydb):
- self.entry = entry
- self._delentry = None # indicates that object was deleted
- self.ldap = ldap
- self.ldapkeydb = ldapkeydb
- self.log = ldap.log.getChild(__name__)
-
- def __assert_not_deleted(self):
- assert self.entry and not self._delentry, (
- "attempt to use to-be-deleted entry %s detected"
- % self._delentry.dn)
-
- def __getitem__(self, key):
- self.__assert_not_deleted()
- val = self.entry.single_value[key]
- if key.lower() in bool_attr_names:
- val = ldap_bool(val)
- return val
-
- def __setitem__(self, key, value):
- self.__assert_not_deleted()
- self.entry[key] = value
-
- def __delitem__(self, key):
- self.__assert_not_deleted()
- del self.entry[key]
-
- def __iter__(self):
- """generates list of ipa names of all PKCS#11 attributes present in the object"""
- self.__assert_not_deleted()
- for ipa_name in list(self.entry.keys()):
- lowercase = ipa_name.lower()
- if lowercase in attrs_name2id:
- yield lowercase
-
- def __len__(self):
- self.__assert_not_deleted()
- return len(self.entry)
-
- def __repr__(self):
- if self._delentry:
- return 'deleted entry: %s' % repr(self._delentry)
-
- sanitized = dict(self.entry)
- for attr in ['ipaPrivateKey', 'ipaPublicKey', 'ipk11publickeyinfo']:
- if attr in sanitized:
- del sanitized[attr]
- return repr(sanitized)
-
- def _cleanup_key(self):
- """remove default values from LDAP entry"""
- default_attrs = get_default_attrs(self.entry['objectclass'])
- empty = object()
- for attr in default_attrs:
- if self.get(attr, empty) == default_attrs[attr]:
- del self[attr]
-
- def _update_key(self):
- """remove default values from LDAP entry and write back changes"""
- if self._delentry:
- self._delete_key()
- return
-
- self._cleanup_key()
-
- try:
- self.ldap.update_entry(self.entry)
- except ipalib.errors.EmptyModlist:
- pass
-
- def _delete_key(self):
- """remove key metadata entry from LDAP
-
- After calling this, the python object is no longer valid and all
- subsequent method calls on it will fail.
- """
- assert not self.entry, (
- "Key._delete_key() called before Key.schedule_deletion()")
- assert self._delentry, "Key._delete_key() called more than once"
- self.log.debug('deleting key id 0x%s DN %s from LDAP',
- hexlify(self._delentry.single_value['ipk11id']),
- self._delentry.dn)
- self.ldap.delete_entry(self._delentry)
- self._delentry = None
- self.ldap = None
- self.ldapkeydb = None
-
- def schedule_deletion(self):
- """schedule key deletion from LDAP
-
- Calling schedule_deletion() will make this object incompatible with
- normal Key. After that the object must not be read or modified.
- Key metadata will be actually deleted when LdapKeyDB.flush() is called.
- """
- assert not self._delentry, (
- "Key.schedule_deletion() called more than once")
- self._delentry = self.entry
- self.entry = None
-
-
-class ReplicaKey(Key):
- # TODO: object class assert
- def __init__(self, entry, ldap, ldapkeydb):
- super(ReplicaKey, self).__init__(entry, ldap, ldapkeydb)
-
-class MasterKey(Key):
- # TODO: object class assert
- def __init__(self, entry, ldap, ldapkeydb):
- super(MasterKey, self).__init__(entry, ldap, ldapkeydb)
-
- @property
- def wrapped_entries(self):
- """LDAP entires with wrapped data
-
- One entry = one blob + ipaWrappingKey pointer to unwrapping key"""
-
- keys = []
- if 'ipaSecretKeyRef' not in self.entry:
- return keys
-
- for dn in self.entry['ipaSecretKeyRef']:
- try:
- obj = self.ldap.get_entry(dn)
- keys.append(obj)
- except ipalib.errors.NotFound:
- continue
-
- return keys
-
- def add_wrapped_data(self, data, wrapping_mech, replica_key_id):
- wrapping_key_uri = 'pkcs11:id=%s;type=public' \
- % uri_escape(replica_key_id)
- # TODO: replace this with 'autogenerate' to prevent collisions
- uuid_rdn = DN('ipk11UniqueId=%s' % uuid.uuid1())
- entry_dn = DN(uuid_rdn, self.ldapkeydb.base_dn)
- entry = self.ldap.make_entry(entry_dn,
- objectClass=['ipaSecretKeyObject', 'ipk11Object'],
- ipaSecretKey=data,
- ipaWrappingKey=wrapping_key_uri,
- ipaWrappingMech=wrapping_mech)
-
- self.log.info('adding master key 0x%s wrapped with replica key 0x%s to %s',
- hexlify(self['ipk11id']),
- hexlify(replica_key_id),
- entry_dn)
- self.ldap.add_entry(entry)
- if 'ipaSecretKeyRef' not in self.entry:
- self.entry['objectClass'] += ['ipaSecretKeyRefObject']
- self.entry.setdefault('ipaSecretKeyRef', []).append(entry_dn)
-
-
-class LdapKeyDB(AbstractHSM):
- def __init__(self, log, ldap, base_dn):
- self.ldap = ldap
- self.base_dn = base_dn
- self.log = log
- self.cache_replica_pubkeys_wrap = None
- self.cache_masterkeys = None
- self.cache_zone_keypairs = None
-
- def _get_key_dict(self, key_type, ldap_filter):
- try:
- objs = self.ldap.get_entries(base_dn=self.base_dn,
- filter=ldap_filter)
- except ipalib.errors.NotFound:
- return {}
-
- keys = {}
- for o in objs:
- # add default values not present in LDAP
- key = key_type(o, self.ldap, self)
- default_attrs = get_default_attrs(key.entry['objectclass'])
- for attr in default_attrs:
- key.setdefault(attr, default_attrs[attr])
-
- assert 'ipk11id' in key, 'key is missing ipk11Id in %s' % key.entry.dn
- key_id = key['ipk11id']
- assert key_id not in keys, 'duplicate ipk11Id=0x%s in "%s" and "%s"' % (hexlify(key_id), key.entry.dn, keys[key_id].entry.dn)
- assert 'ipk11label' in key, 'key "%s" is missing ipk11Label' % key.entry.dn
- assert 'objectclass' in key.entry, 'key "%s" is missing objectClass attribute' % key.entry.dn
-
- keys[key_id] = key
-
- self._update_keys()
- return keys
-
- def _update_keys(self):
- for cache in [self.cache_masterkeys, self.cache_replica_pubkeys_wrap,
- self.cache_zone_keypairs]:
- if cache:
- for key in cache.values():
- key._update_key()
-
- def flush(self):
- """write back content of caches to LDAP"""
- self._update_keys()
- self.cache_masterkeys = None
- self.cache_replica_pubkeys_wrap = None
- self.cache_zone_keypairs = None
-
- def _import_keys_metadata(self, source_keys):
- """import key metadata from Key-compatible objects
-
- metadata from multiple source keys can be imported into single LDAP
- object
-
- :param: source_keys is iterable of (Key object, PKCS#11 object class)"""
-
- entry_dn = DN('ipk11UniqueId=autogenerate', self.base_dn)
- entry = self.ldap.make_entry(entry_dn, objectClass=['ipk11Object'])
- new_key = Key(entry, self.ldap, self)
-
- for source_key, pkcs11_class in source_keys:
- if pkcs11_class == _ipap11helper.KEY_CLASS_SECRET_KEY:
- entry['objectClass'].append('ipk11SecretKey')
- elif pkcs11_class == _ipap11helper.KEY_CLASS_PUBLIC_KEY:
- entry['objectClass'].append('ipk11PublicKey')
- elif pkcs11_class == _ipap11helper.KEY_CLASS_PRIVATE_KEY:
- entry['objectClass'].append('ipk11PrivateKey')
- else:
- raise AssertionError('unsupported object class %s' % pkcs11_class)
-
- populate_pkcs11_metadata(source_key, new_key)
- new_key._cleanup_key()
- return new_key
-
- def import_master_key(self, mkey):
- new_key = self._import_keys_metadata(
- [(mkey, _ipap11helper.KEY_CLASS_SECRET_KEY)])
- self.ldap.add_entry(new_key.entry)
- self.log.debug('imported master key metadata: %s', new_key.entry)
-
- def import_zone_key(self, pubkey, pubkey_data, privkey,
- privkey_wrapped_data, wrapping_mech, master_key_id):
- new_key = self._import_keys_metadata(
- [(pubkey, _ipap11helper.KEY_CLASS_PUBLIC_KEY),
- (privkey, _ipap11helper.KEY_CLASS_PRIVATE_KEY)])
-
- new_key.entry['objectClass'].append('ipaPrivateKeyObject')
- new_key.entry['ipaPrivateKey'] = privkey_wrapped_data
- new_key.entry['ipaWrappingKey'] = 'pkcs11:id=%s;type=secret-key' \
- % uri_escape(master_key_id)
- new_key.entry['ipaWrappingMech'] = wrapping_mech
-
- new_key.entry['objectClass'].append('ipaPublicKeyObject')
- new_key.entry['ipaPublicKey'] = pubkey_data
-
- self.ldap.add_entry(new_key.entry)
- self.log.debug('imported zone key id: 0x%s', hexlify(new_key['ipk11id']))
-
- @property
- def replica_pubkeys_wrap(self):
- if self.cache_replica_pubkeys_wrap:
- return self.cache_replica_pubkeys_wrap
-
- keys = self._filter_replica_keys(
- self._get_key_dict(ReplicaKey,
- '(&(objectClass=ipk11PublicKey)(ipk11Wrap=TRUE)(objectClass=ipaPublicKeyObject))'))
-
- self.cache_replica_pubkeys_wrap = keys
- return keys
-
- @property
- def master_keys(self):
- if self.cache_masterkeys:
- return self.cache_masterkeys
-
- keys = self._get_key_dict(MasterKey,
- '(&(objectClass=ipk11SecretKey)(|(ipk11UnWrap=TRUE)(!(ipk11UnWrap=*)))(ipk11Label=dnssec-master))')
- for key in keys.values():
- prefix = 'dnssec-master'
- assert key['ipk11label'] == prefix, \
- 'secret key dn="%s" ipk11id=0x%s ipk11label="%s" with ipk11UnWrap = TRUE does not have '\
- '"%s" key label' % (
- key.entry.dn,
- hexlify(key['ipk11id']),
- str(key['ipk11label']),
- prefix)
-
- self.cache_masterkeys = keys
- return keys
-
- @property
- def zone_keypairs(self):
- if self.cache_zone_keypairs:
- return self.cache_zone_keypairs
-
- self.cache_zone_keypairs = self._filter_zone_keys(
- self._get_key_dict(Key,
- '(&(objectClass=ipk11PrivateKey)(objectClass=ipaPrivateKeyObject)(objectClass=ipk11PublicKey)(objectClass=ipaPublicKeyObject))'))
-
- return self.cache_zone_keypairs
-
-if __name__ == '__main__':
- # this is debugging mode
- # print information we think are useful to stdout
- # other garbage goes via logger to stderr
- ipa_log_manager.standard_logging_setup(debug=True)
- log = ipa_log_manager.root_logger
-
- # IPA framework initialization
- ipalib.api.bootstrap(in_server=True, log=None) # no logging to file
- ipalib.api.finalize()
-
- # LDAP initialization
- dns_dn = DN(ipalib.api.env.container_dns, ipalib.api.env.basedn)
- ldap = ipaldap.LDAPClient(ipalib.api.env.ldap_uri)
- log.debug('Connecting to LDAP')
- # GSSAPI will be used, used has to be kinited already
- ldap.gssapi_bind()
- log.debug('Connected')
-
- ldapkeydb = LdapKeyDB(log, ldap, DN(('cn', 'keys'), ('cn', 'sec'),
- ipalib.api.env.container_dns,
- ipalib.api.env.basedn))
-
- print('replica public keys: CKA_WRAP = TRUE')
- print('====================================')
- for pubkey_id, pubkey in ldapkeydb.replica_pubkeys_wrap.items():
- print(hexlify(pubkey_id))
- pprint(pubkey)
-
- print('')
- print('master keys')
- print('===========')
- for mkey_id, mkey in ldapkeydb.master_keys.items():
- print(hexlify(mkey_id))
- pprint(mkey)
-
- print('')
- print('zone key pairs')
- print('==============')
- for key_id, key in ldapkeydb.zone_keypairs.items():
- print(hexlify(key_id))
- pprint(key)