summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChristian Heimes <cheimes@redhat.com>2018-01-31 17:03:31 +0100
committerChristian Heimes <cheimes@redhat.com>2018-02-07 17:27:11 +0100
commit6f65abfd1156803c047a76ab6c839d1e83d3d3ad (patch)
tree8606e9816ddd39cc69da4559df50de9906115eda
parentf39d855af446f5a98a16cbd8971d2f2463b019af (diff)
downloadfreeipa-6f65abfd1156803c047a76ab6c839d1e83d3d3ad.tar.gz
freeipa-6f65abfd1156803c047a76ab6c839d1e83d3d3ad.tar.xz
freeipa-6f65abfd1156803c047a76ab6c839d1e83d3d3ad.zip
DNSSEC code cleanup
Replace assert with proper check and exception. Signed-off-by: Christian Heimes <cheimes@redhat.com> Reviewed-By: Stanislav Laznicka <slaznick@redhat.com>
-rwxr-xr-xdaemons/dnssec/ipa-dnskeysync-replica25
-rwxr-xr-xdaemons/dnssec/ipa-ods-exporter117
-rw-r--r--ipaserver/dnssec/ldapkeydb.py72
3 files changed, 134 insertions, 80 deletions
diff --git a/daemons/dnssec/ipa-dnskeysync-replica b/daemons/dnssec/ipa-dnskeysync-replica
index ee641ae50..6356aeb65 100755
--- a/daemons/dnssec/ipa-dnskeysync-replica
+++ b/daemons/dnssec/ipa-dnskeysync-replica
@@ -71,9 +71,12 @@ def ldap2replica_master_keys_sync(ldapkeydb, localhsm):
hex_set(new_keys))
for mkey_id in new_keys:
mkey_ldap = ldapkeydb.master_keys[mkey_id]
- assert mkey_ldap.wrapped_entries, ("Master key 0x%s in LDAP is " \
- "missing key material referenced by ipaSecretKeyRefObject " \
- "attribute") % str_hexlify(mkey_id)
+ if not mkey_ldap.wrapped_entries:
+ raise ValueError(
+ "Master key 0x%s in LDAP is missing key material "
+ "referenced by ipaSecretKeyRefObject attribute" %
+ str_hexlify(mkey_id)
+ )
for wrapped_ldap in mkey_ldap.wrapped_entries:
unwrapping_key = find_unwrapping_key(
localhsm, wrapped_ldap.single_value['ipaWrappingKey'])
@@ -81,9 +84,11 @@ def ldap2replica_master_keys_sync(ldapkeydb, localhsm):
break
# TODO: Could it happen in normal cases?
- assert unwrapping_key is not None, ("Local HSM does not contain " \
- "suitable unwrapping key for master key 0x%s") % \
- str_hexlify(mkey_id)
+ if unwrapping_key is None:
+ raise ValueError(
+ "Local HSM does not contain suitable unwrapping key "
+ "for master key 0x%s" % str_hexlify(mkey_id)
+ )
params = ldap2p11helper_api_params(mkey_ldap)
params['data'] = wrapped_ldap.single_value['ipaSecretKey']
@@ -114,9 +119,11 @@ def ldap2replica_zone_keys_sync(ldapkeydb, localhsm):
zkey_ldap['ipaWrappingKey'], str_hexlify(zkey_id))
unwrapping_key = find_unwrapping_key(
localhsm, zkey_ldap['ipaWrappingKey'])
- assert unwrapping_key is not None, \
- "Local HSM does not contain suitable unwrapping key for ' \
- 'zone key 0x%s" % str_hexlify(zkey_id)
+ if unwrapping_key is None:
+ raise ValueError(
+ "Local HSM does not contain suitable unwrapping key for "
+ "zone key 0x%s" % str_hexlify(zkey_id)
+ )
logger.debug('Importing zone key pair 0x%s', str_hexlify(zkey_id))
localhsm.import_private_key(zkey_ldap, zkey_ldap['ipaPrivateKey'],
diff --git a/daemons/dnssec/ipa-ods-exporter b/daemons/dnssec/ipa-ods-exporter
index d00dab9de..fb054c9cb 100755
--- a/daemons/dnssec/ipa-ods-exporter
+++ b/daemons/dnssec/ipa-ods-exporter
@@ -191,7 +191,7 @@ def ods2bind_timestamps(key_state, key_type, ods_times):
# idnsSecKeyInactive is ignored for KSK on purpose
else:
- assert False, "unsupported key type %s" % key_type
+ raise ValueError("unsupported key type %s" % key_type)
# idnsSecKeyDelete is relevant only in DEAD state
if 'idnsSecKeyDelete' in ods_times and key_state == KSM_STATE_DEAD:
@@ -236,7 +236,8 @@ def get_ods_keys(zone_name):
cur = db.execute("SELECT id FROM zones WHERE LOWER(name)=LOWER(?)",
(zone_name,))
rows = cur.fetchall()
- assert len(rows) == 1, "exactly one DNS zone should exist in ODS DB"
+ if len(rows) != 1:
+ raise ValueError("exactly one DNS zone should exist in ODS DB")
zone_id = rows[0][0]
# get relevant keys for given zone ID:
@@ -253,8 +254,8 @@ def get_ods_keys(zone_name):
keys = {}
for row in cur:
key_data = sql2ldap_flags(row['keytype'])
- assert key_data.get('idnsSecKeyZONE', None) == 'TRUE', \
- 'unexpected key type 0x%x' % row['keytype']
+ if key_data.get('idnsSecKeyZONE') != 'TRUE':
+ raise ValueError("unexpected key type 0x%x" % row['keytype'])
if key_data.get('idnsSecKeySEP', 'FALSE') == 'TRUE':
key_type = 'KSK'
else:
@@ -262,12 +263,16 @@ def get_ods_keys(zone_name):
# transform key state to timestamps for BIND with equivalent semantics
ods_times = sql2datetimes(row)
- key_data.update(ods2bind_timestamps(row['state'], key_type, ods_times))
+ key_data.update(
+ ods2bind_timestamps(row['state'], key_type, ods_times)
+ )
key_data.update(sql2ldap_algorithm(row['algorithm']))
- key_id = "%s-%s-%s" % (key_type,
- datetime2ldap(key_data['idnsSecKeyCreated']),
- row['HSMkey_id'])
+ key_id = "%s-%s-%s" % (
+ key_type,
+ datetime2ldap(key_data['idnsSecKeyCreated']),
+ row['HSMkey_id']
+ )
key_data.update(sql2ldap_keyid(row['HSMkey_id']))
keys[key_id] = key_data
@@ -358,12 +363,17 @@ def master2ldap_master_keys_sync(ldapkeydb, localhsm):
matching_keys = localhsm.find_keys(
uri=wrapped_entry.single_value['ipaWrappingKey'])
for matching_key in matching_keys.values():
- assert matching_key['ipk11label'].startswith(u'dnssec-replica:'), \
- 'Wrapped key "%s" refers to PKCS#11 URI "%s" which is ' \
- 'not a know DNSSEC replica key: label "%s" does not start ' \
- 'with "dnssec-replica:" prefix' % (wrapped_entry.dn,
- wrapped_entry['ipaWrappingKey'],
- matching_key['ipk11label'])
+ label = matching_key['ipk11label']
+ if not label.startswith(u'dnssec-replica:'):
+ raise ValueError(
+ "Wrapped key '%s' refers to PKCS#11 URI '%s' which "
+ "is not a known DNSSEC replica key: label '%s' "
+ "does not start with 'dnssec-replica:' prefix" % (
+ wrapped_entry.dn,
+ wrapped_entry['ipaWrappingKey'],
+ label
+ )
+ )
used_replica_keys.add(matching_key['ipk11id'])
new_replica_keys = enabled_replica_key_ids - used_replica_keys
@@ -396,10 +406,13 @@ def master2ldap_zone_keys_sync(ldapkeydb, localhsm):
privkeys_local = localhsm.zone_privkeys
logger.debug("zone keys in local HSM: %s", hex_set(privkeys_local))
- assert set(pubkeys_local) == set(privkeys_local), (
+ if set(pubkeys_local) != set(privkeys_local):
+ raise ValueError(
"IDs of private and public keys for DNS zones in local HSM does "
- "not match to key pairs: %s vs. %s" %
- (hex_set(pubkeys_local), hex_set(privkeys_local)))
+ "not match to key pairs: %s vs. %s" % (
+ hex_set(pubkeys_local), hex_set(privkeys_local)
+ )
+ )
new_keys = set(pubkeys_local) - set(keypairs_ldap)
logger.debug("new zone keys in local HSM: %s", hex_set(new_keys))
@@ -432,10 +445,13 @@ def master2ldap_zone_keys_purge(ldapkeydb, localhsm):
pubkeys_local = localhsm.zone_pubkeys
privkeys_local = localhsm.zone_privkeys
logger.debug("zone keys in local HSM: %s", hex_set(privkeys_local))
- assert set(pubkeys_local) == set(privkeys_local), \
- "IDs of private and public keys for DNS zones in local HSM does " \
- "not match to key pairs: %s vs. %s" % \
- (hex_set(pubkeys_local), hex_set(privkeys_local))
+ if set(pubkeys_local) != set(privkeys_local):
+ raise ValueError(
+ "IDs of private and public keys for DNS zones in local HSM does "
+ "not match to key pairs: %s vs. %s" % (
+ hex_set(pubkeys_local), hex_set(privkeys_local)
+ )
+ )
deleted_key_ids = set(keypairs_ldap) - set(pubkeys_local)
logger.debug("zone keys deleted from local HSM but present in LDAP: %s",
@@ -472,8 +488,8 @@ def receive_systemd_command():
# this implements cmdhandler_handle_cmd() logic
cmd = conn.recv(ODS_SE_MAXLINE).strip()
- # ODS uses an ASCII protocol
- if not isinstance(cmd, six.text_type):
+ # ODS uses an ASCII protocol, the rest of the code expects str
+ if six.PY3:
cmd = cmd.decode('ascii')
logger.debug('received command "%s" from systemd socket', cmd)
return cmd, conn
@@ -484,44 +500,53 @@ def parse_command(cmd):
Exit code None means that execution should continue.
"""
if cmd == 'ipa-hsm-update':
- return (0,
- 'HSM synchronization finished, skipping zone synchronization.',
- None,
- cmd)
+ return (
+ 0,
+ 'HSM synchronization finished, skipping zone synchronization.',
+ None,
+ cmd
+ )
elif cmd == 'ipa-full-update':
- return (None,
- 'Synchronization of all zones was finished.',
- None,
- cmd)
+ return (
+ None,
+ 'Synchronization of all zones was finished.',
+ None,
+ cmd
+ )
elif cmd.startswith('ldap-cleanup '):
zone_name = cmd2ods_zone_name(cmd)
- return (None,
- 'Zone "%s" metadata will be removed from LDAP.\n' % zone_name,
- zone_name,
- 'ldap-cleanup')
+ return (
+ None,
+ 'Zone "%s" metadata will be removed from LDAP.\n' % zone_name,
+ zone_name,
+ 'ldap-cleanup'
+ )
elif cmd.startswith('update '):
zone_name = cmd2ods_zone_name(cmd)
- return (None,
- 'Zone "%s" metadata will be updated in LDAP.\n' % zone_name,
- zone_name,
- 'update')
+ return (
+ None,
+ 'Zone "%s" metadata will be updated in LDAP.\n' % zone_name,
+ zone_name,
+ 'update'
+ )
else:
- return (0,
- 'Command "%s" is not supported by IPA; '
- 'HSM synchronization was finished and the command '
- 'will be ignored.' % cmd,
- None,
- None)
+ return (
+ 0,
+ "Command '%s' is not supported by IPA; HSM synchronization was "
+ "finished and the command will be ignored." % cmd,
+ None,
+ None
+ )
def send_systemd_reply(conn, reply):
# Reply & close connection early.
# This is necessary to let Enforcer to unlock the ODS DB.
- if isinstance(reply, six.text_type):
+ if six.PY3:
reply = reply.encode('ascii')
conn.send(reply + b'\n')
conn.shutdown(socket.SHUT_RDWR)
diff --git a/ipaserver/dnssec/ldapkeydb.py b/ipaserver/dnssec/ldapkeydb.py
index 143221cbe..3163e3cd5 100644
--- a/ipaserver/dnssec/ldapkeydb.py
+++ b/ipaserver/dnssec/ldapkeydb.py
@@ -28,7 +28,8 @@ logger = logging.getLogger(__name__)
def uri_escape(val):
"""convert val to %-notation suitable for ID component in URI"""
- assert len(val) > 0, "zero-length URI component detected"
+ if len(val) == 0:
+ raise ValueError("zero-length URI component detected")
hexval = str_hexlify(val)
out = '%'
# pylint: disable=E1127
@@ -41,7 +42,8 @@ def ldap_bool(val):
elif val == 'FALSE' or val is False:
return False
else:
- raise AssertionError('invalid LDAP boolean "%s"' % val)
+ raise ValueError('invalid LDAP boolean "%s"' % val)
+
def get_default_attrs(object_classes):
# object class -> default attribute values mapping
@@ -103,8 +105,9 @@ def get_default_attrs(object_classes):
present_clss.add(cls.lower())
present_clss.intersection_update(set(defaults.keys()))
if len(present_clss) <= 0:
- raise AssertionError('none of "%s" object classes are supported' %
- object_classes)
+ raise ValueError(
+ "none of '%s' object classes are supported" % object_classes
+ )
result = {}
for cls in present_clss:
@@ -299,16 +302,26 @@ class LdapKeyDB(AbstractHSM):
for attr in default_attrs:
key.setdefault(attr, default_attrs[attr])
- assert 'ipk11id' in key, \
- 'key is missing ipk11Id in %s' % key.entry.dn
+ if 'ipk11id' not in key:
+ raise ValueError(
+ 'key is missing ipk11Id in %s' % key.entry.dn
+ )
key_id = key['ipk11id']
- assert key_id not in keys, \
- 'duplicate ipk11Id=0x%s in "%s" and "%s"' % \
- (str_hexlify(key_id), key.entry.dn, keys[key_id].entry.dn)
- assert 'ipk11label' in key, \
- 'key "%s" is missing ipk11Label' % key.entry.dn
- assert 'objectclass' in key.entry, \
- 'key "%s" is missing objectClass attribute' % key.entry.dn
+ if key_id in keys:
+ raise ValueError(
+ "duplicate ipk11Id=0x%s in '%s' and '%s'"
+ % (str_hexlify(key_id), key.entry.dn,
+ keys[key_id].entry.dn)
+ )
+ if 'ipk11label' not in key:
+ raise ValueError(
+ "key '%s' is missing ipk11Label" % key.entry.dn
+ )
+ if 'objectclass' not in key.entry:
+ raise ValueError(
+ "key '%s' is missing objectClass attribute"
+ % key.entry.dn
+ )
keys[key_id] = key
@@ -349,7 +362,9 @@ class LdapKeyDB(AbstractHSM):
elif pkcs11_class == _ipap11helper.KEY_CLASS_PRIVATE_KEY:
entry['objectClass'].append('ipk11PrivateKey')
else:
- raise AssertionError('unsupported object class %s' % pkcs11_class)
+ raise ValueError(
+ "unsupported object class '%s'" % pkcs11_class
+ )
populate_pkcs11_metadata(source_key, new_key)
new_key._cleanup_key()
@@ -386,8 +401,12 @@ class LdapKeyDB(AbstractHSM):
return self.cache_replica_pubkeys_wrap
keys = self._filter_replica_keys(
- self._get_key_dict(ReplicaKey,
- '(&(objectClass=ipk11PublicKey)(ipk11Wrap=TRUE)(objectClass=ipaPublicKeyObject))'))
+ self._get_key_dict(
+ ReplicaKey,
+ '(&(objectClass=ipk11PublicKey)(ipk11Wrap=TRUE)'
+ '(objectClass=ipaPublicKeyObject))'
+ )
+ )
self.cache_replica_pubkeys_wrap = keys
return keys
@@ -397,17 +416,20 @@ class LdapKeyDB(AbstractHSM):
if self.cache_masterkeys:
return self.cache_masterkeys
- keys = self._get_key_dict(MasterKey,
- '(&(objectClass=ipk11SecretKey)(|(ipk11UnWrap=TRUE)(!(ipk11UnWrap=*)))(ipk11Label=dnssec-master))')
+ keys = self._get_key_dict(
+ MasterKey,
+ '(&(objectClass=ipk11SecretKey)'
+ '(|(ipk11UnWrap=TRUE)(!(ipk11UnWrap=*)))'
+ '(ipk11Label=dnssec-master))'
+ )
for key in keys.values():
prefix = 'dnssec-master'
- assert key['ipk11label'] == prefix, \
- 'secret key dn="%s" ipk11id=0x%s ipk11label="%s" with ' \
- 'ipk11UnWrap = TRUE does not have "%s" key label' % (
- key.entry.dn,
- str_hexlify(key['ipk11id']),
- str(key['ipk11label']),
- prefix
+ if key['ipk11label'] != prefix:
+ raise ValueError(
+ "secret key dn='%s' ipk11id=0x%s ipk11label='%s' with "
+ "ipk11UnWrap = TRUE does not have '%s' key label'"
+ % (key.entry.dn, str_hexlify(key['ipk11id']),
+ str(key['ipk11label']), prefix)
)
self.cache_masterkeys = keys