diff options
author | Christian Heimes <cheimes@redhat.com> | 2018-01-31 17:03:31 +0100 |
---|---|---|
committer | Christian Heimes <cheimes@redhat.com> | 2018-02-07 17:27:11 +0100 |
commit | 6f65abfd1156803c047a76ab6c839d1e83d3d3ad (patch) | |
tree | 8606e9816ddd39cc69da4559df50de9906115eda | |
parent | f39d855af446f5a98a16cbd8971d2f2463b019af (diff) | |
download | freeipa-6f65abfd1156803c047a76ab6c839d1e83d3d3ad.tar.gz freeipa-6f65abfd1156803c047a76ab6c839d1e83d3d3ad.tar.xz freeipa-6f65abfd1156803c047a76ab6c839d1e83d3d3ad.zip |
DNSSEC code cleanup
Replace assert with proper check and exception.
Signed-off-by: Christian Heimes <cheimes@redhat.com>
Reviewed-By: Stanislav Laznicka <slaznick@redhat.com>
-rwxr-xr-x | daemons/dnssec/ipa-dnskeysync-replica | 25 | ||||
-rwxr-xr-x | daemons/dnssec/ipa-ods-exporter | 117 | ||||
-rw-r--r-- | ipaserver/dnssec/ldapkeydb.py | 72 |
3 files changed, 134 insertions, 80 deletions
diff --git a/daemons/dnssec/ipa-dnskeysync-replica b/daemons/dnssec/ipa-dnskeysync-replica index ee641ae50..6356aeb65 100755 --- a/daemons/dnssec/ipa-dnskeysync-replica +++ b/daemons/dnssec/ipa-dnskeysync-replica @@ -71,9 +71,12 @@ def ldap2replica_master_keys_sync(ldapkeydb, localhsm): hex_set(new_keys)) for mkey_id in new_keys: mkey_ldap = ldapkeydb.master_keys[mkey_id] - assert mkey_ldap.wrapped_entries, ("Master key 0x%s in LDAP is " \ - "missing key material referenced by ipaSecretKeyRefObject " \ - "attribute") % str_hexlify(mkey_id) + if not mkey_ldap.wrapped_entries: + raise ValueError( + "Master key 0x%s in LDAP is missing key material " + "referenced by ipaSecretKeyRefObject attribute" % + str_hexlify(mkey_id) + ) for wrapped_ldap in mkey_ldap.wrapped_entries: unwrapping_key = find_unwrapping_key( localhsm, wrapped_ldap.single_value['ipaWrappingKey']) @@ -81,9 +84,11 @@ def ldap2replica_master_keys_sync(ldapkeydb, localhsm): break # TODO: Could it happen in normal cases? - assert unwrapping_key is not None, ("Local HSM does not contain " \ - "suitable unwrapping key for master key 0x%s") % \ - str_hexlify(mkey_id) + if unwrapping_key is None: + raise ValueError( + "Local HSM does not contain suitable unwrapping key " + "for master key 0x%s" % str_hexlify(mkey_id) + ) params = ldap2p11helper_api_params(mkey_ldap) params['data'] = wrapped_ldap.single_value['ipaSecretKey'] @@ -114,9 +119,11 @@ def ldap2replica_zone_keys_sync(ldapkeydb, localhsm): zkey_ldap['ipaWrappingKey'], str_hexlify(zkey_id)) unwrapping_key = find_unwrapping_key( localhsm, zkey_ldap['ipaWrappingKey']) - assert unwrapping_key is not None, \ - "Local HSM does not contain suitable unwrapping key for ' \ - 'zone key 0x%s" % str_hexlify(zkey_id) + if unwrapping_key is None: + raise ValueError( + "Local HSM does not contain suitable unwrapping key for " + "zone key 0x%s" % str_hexlify(zkey_id) + ) logger.debug('Importing zone key pair 0x%s', str_hexlify(zkey_id)) localhsm.import_private_key(zkey_ldap, zkey_ldap['ipaPrivateKey'], diff --git a/daemons/dnssec/ipa-ods-exporter b/daemons/dnssec/ipa-ods-exporter index d00dab9de..fb054c9cb 100755 --- a/daemons/dnssec/ipa-ods-exporter +++ b/daemons/dnssec/ipa-ods-exporter @@ -191,7 +191,7 @@ def ods2bind_timestamps(key_state, key_type, ods_times): # idnsSecKeyInactive is ignored for KSK on purpose else: - assert False, "unsupported key type %s" % key_type + raise ValueError("unsupported key type %s" % key_type) # idnsSecKeyDelete is relevant only in DEAD state if 'idnsSecKeyDelete' in ods_times and key_state == KSM_STATE_DEAD: @@ -236,7 +236,8 @@ def get_ods_keys(zone_name): cur = db.execute("SELECT id FROM zones WHERE LOWER(name)=LOWER(?)", (zone_name,)) rows = cur.fetchall() - assert len(rows) == 1, "exactly one DNS zone should exist in ODS DB" + if len(rows) != 1: + raise ValueError("exactly one DNS zone should exist in ODS DB") zone_id = rows[0][0] # get relevant keys for given zone ID: @@ -253,8 +254,8 @@ def get_ods_keys(zone_name): keys = {} for row in cur: key_data = sql2ldap_flags(row['keytype']) - assert key_data.get('idnsSecKeyZONE', None) == 'TRUE', \ - 'unexpected key type 0x%x' % row['keytype'] + if key_data.get('idnsSecKeyZONE') != 'TRUE': + raise ValueError("unexpected key type 0x%x" % row['keytype']) if key_data.get('idnsSecKeySEP', 'FALSE') == 'TRUE': key_type = 'KSK' else: @@ -262,12 +263,16 @@ def get_ods_keys(zone_name): # transform key state to timestamps for BIND with equivalent semantics ods_times = sql2datetimes(row) - key_data.update(ods2bind_timestamps(row['state'], key_type, ods_times)) + key_data.update( + ods2bind_timestamps(row['state'], key_type, ods_times) + ) key_data.update(sql2ldap_algorithm(row['algorithm'])) - key_id = "%s-%s-%s" % (key_type, - datetime2ldap(key_data['idnsSecKeyCreated']), - row['HSMkey_id']) + key_id = "%s-%s-%s" % ( + key_type, + datetime2ldap(key_data['idnsSecKeyCreated']), + row['HSMkey_id'] + ) key_data.update(sql2ldap_keyid(row['HSMkey_id'])) keys[key_id] = key_data @@ -358,12 +363,17 @@ def master2ldap_master_keys_sync(ldapkeydb, localhsm): matching_keys = localhsm.find_keys( uri=wrapped_entry.single_value['ipaWrappingKey']) for matching_key in matching_keys.values(): - assert matching_key['ipk11label'].startswith(u'dnssec-replica:'), \ - 'Wrapped key "%s" refers to PKCS#11 URI "%s" which is ' \ - 'not a know DNSSEC replica key: label "%s" does not start ' \ - 'with "dnssec-replica:" prefix' % (wrapped_entry.dn, - wrapped_entry['ipaWrappingKey'], - matching_key['ipk11label']) + label = matching_key['ipk11label'] + if not label.startswith(u'dnssec-replica:'): + raise ValueError( + "Wrapped key '%s' refers to PKCS#11 URI '%s' which " + "is not a known DNSSEC replica key: label '%s' " + "does not start with 'dnssec-replica:' prefix" % ( + wrapped_entry.dn, + wrapped_entry['ipaWrappingKey'], + label + ) + ) used_replica_keys.add(matching_key['ipk11id']) new_replica_keys = enabled_replica_key_ids - used_replica_keys @@ -396,10 +406,13 @@ def master2ldap_zone_keys_sync(ldapkeydb, localhsm): privkeys_local = localhsm.zone_privkeys logger.debug("zone keys in local HSM: %s", hex_set(privkeys_local)) - assert set(pubkeys_local) == set(privkeys_local), ( + if set(pubkeys_local) != set(privkeys_local): + raise ValueError( "IDs of private and public keys for DNS zones in local HSM does " - "not match to key pairs: %s vs. %s" % - (hex_set(pubkeys_local), hex_set(privkeys_local))) + "not match to key pairs: %s vs. %s" % ( + hex_set(pubkeys_local), hex_set(privkeys_local) + ) + ) new_keys = set(pubkeys_local) - set(keypairs_ldap) logger.debug("new zone keys in local HSM: %s", hex_set(new_keys)) @@ -432,10 +445,13 @@ def master2ldap_zone_keys_purge(ldapkeydb, localhsm): pubkeys_local = localhsm.zone_pubkeys privkeys_local = localhsm.zone_privkeys logger.debug("zone keys in local HSM: %s", hex_set(privkeys_local)) - assert set(pubkeys_local) == set(privkeys_local), \ - "IDs of private and public keys for DNS zones in local HSM does " \ - "not match to key pairs: %s vs. %s" % \ - (hex_set(pubkeys_local), hex_set(privkeys_local)) + if set(pubkeys_local) != set(privkeys_local): + raise ValueError( + "IDs of private and public keys for DNS zones in local HSM does " + "not match to key pairs: %s vs. %s" % ( + hex_set(pubkeys_local), hex_set(privkeys_local) + ) + ) deleted_key_ids = set(keypairs_ldap) - set(pubkeys_local) logger.debug("zone keys deleted from local HSM but present in LDAP: %s", @@ -472,8 +488,8 @@ def receive_systemd_command(): # this implements cmdhandler_handle_cmd() logic cmd = conn.recv(ODS_SE_MAXLINE).strip() - # ODS uses an ASCII protocol - if not isinstance(cmd, six.text_type): + # ODS uses an ASCII protocol, the rest of the code expects str + if six.PY3: cmd = cmd.decode('ascii') logger.debug('received command "%s" from systemd socket', cmd) return cmd, conn @@ -484,44 +500,53 @@ def parse_command(cmd): Exit code None means that execution should continue. """ if cmd == 'ipa-hsm-update': - return (0, - 'HSM synchronization finished, skipping zone synchronization.', - None, - cmd) + return ( + 0, + 'HSM synchronization finished, skipping zone synchronization.', + None, + cmd + ) elif cmd == 'ipa-full-update': - return (None, - 'Synchronization of all zones was finished.', - None, - cmd) + return ( + None, + 'Synchronization of all zones was finished.', + None, + cmd + ) elif cmd.startswith('ldap-cleanup '): zone_name = cmd2ods_zone_name(cmd) - return (None, - 'Zone "%s" metadata will be removed from LDAP.\n' % zone_name, - zone_name, - 'ldap-cleanup') + return ( + None, + 'Zone "%s" metadata will be removed from LDAP.\n' % zone_name, + zone_name, + 'ldap-cleanup' + ) elif cmd.startswith('update '): zone_name = cmd2ods_zone_name(cmd) - return (None, - 'Zone "%s" metadata will be updated in LDAP.\n' % zone_name, - zone_name, - 'update') + return ( + None, + 'Zone "%s" metadata will be updated in LDAP.\n' % zone_name, + zone_name, + 'update' + ) else: - return (0, - 'Command "%s" is not supported by IPA; ' - 'HSM synchronization was finished and the command ' - 'will be ignored.' % cmd, - None, - None) + return ( + 0, + "Command '%s' is not supported by IPA; HSM synchronization was " + "finished and the command will be ignored." % cmd, + None, + None + ) def send_systemd_reply(conn, reply): # Reply & close connection early. # This is necessary to let Enforcer to unlock the ODS DB. - if isinstance(reply, six.text_type): + if six.PY3: reply = reply.encode('ascii') conn.send(reply + b'\n') conn.shutdown(socket.SHUT_RDWR) diff --git a/ipaserver/dnssec/ldapkeydb.py b/ipaserver/dnssec/ldapkeydb.py index 143221cbe..3163e3cd5 100644 --- a/ipaserver/dnssec/ldapkeydb.py +++ b/ipaserver/dnssec/ldapkeydb.py @@ -28,7 +28,8 @@ logger = logging.getLogger(__name__) def uri_escape(val): """convert val to %-notation suitable for ID component in URI""" - assert len(val) > 0, "zero-length URI component detected" + if len(val) == 0: + raise ValueError("zero-length URI component detected") hexval = str_hexlify(val) out = '%' # pylint: disable=E1127 @@ -41,7 +42,8 @@ def ldap_bool(val): elif val == 'FALSE' or val is False: return False else: - raise AssertionError('invalid LDAP boolean "%s"' % val) + raise ValueError('invalid LDAP boolean "%s"' % val) + def get_default_attrs(object_classes): # object class -> default attribute values mapping @@ -103,8 +105,9 @@ def get_default_attrs(object_classes): present_clss.add(cls.lower()) present_clss.intersection_update(set(defaults.keys())) if len(present_clss) <= 0: - raise AssertionError('none of "%s" object classes are supported' % - object_classes) + raise ValueError( + "none of '%s' object classes are supported" % object_classes + ) result = {} for cls in present_clss: @@ -299,16 +302,26 @@ class LdapKeyDB(AbstractHSM): for attr in default_attrs: key.setdefault(attr, default_attrs[attr]) - assert 'ipk11id' in key, \ - 'key is missing ipk11Id in %s' % key.entry.dn + if 'ipk11id' not in key: + raise ValueError( + 'key is missing ipk11Id in %s' % key.entry.dn + ) key_id = key['ipk11id'] - assert key_id not in keys, \ - 'duplicate ipk11Id=0x%s in "%s" and "%s"' % \ - (str_hexlify(key_id), key.entry.dn, keys[key_id].entry.dn) - assert 'ipk11label' in key, \ - 'key "%s" is missing ipk11Label' % key.entry.dn - assert 'objectclass' in key.entry, \ - 'key "%s" is missing objectClass attribute' % key.entry.dn + if key_id in keys: + raise ValueError( + "duplicate ipk11Id=0x%s in '%s' and '%s'" + % (str_hexlify(key_id), key.entry.dn, + keys[key_id].entry.dn) + ) + if 'ipk11label' not in key: + raise ValueError( + "key '%s' is missing ipk11Label" % key.entry.dn + ) + if 'objectclass' not in key.entry: + raise ValueError( + "key '%s' is missing objectClass attribute" + % key.entry.dn + ) keys[key_id] = key @@ -349,7 +362,9 @@ class LdapKeyDB(AbstractHSM): elif pkcs11_class == _ipap11helper.KEY_CLASS_PRIVATE_KEY: entry['objectClass'].append('ipk11PrivateKey') else: - raise AssertionError('unsupported object class %s' % pkcs11_class) + raise ValueError( + "unsupported object class '%s'" % pkcs11_class + ) populate_pkcs11_metadata(source_key, new_key) new_key._cleanup_key() @@ -386,8 +401,12 @@ class LdapKeyDB(AbstractHSM): return self.cache_replica_pubkeys_wrap keys = self._filter_replica_keys( - self._get_key_dict(ReplicaKey, - '(&(objectClass=ipk11PublicKey)(ipk11Wrap=TRUE)(objectClass=ipaPublicKeyObject))')) + self._get_key_dict( + ReplicaKey, + '(&(objectClass=ipk11PublicKey)(ipk11Wrap=TRUE)' + '(objectClass=ipaPublicKeyObject))' + ) + ) self.cache_replica_pubkeys_wrap = keys return keys @@ -397,17 +416,20 @@ class LdapKeyDB(AbstractHSM): if self.cache_masterkeys: return self.cache_masterkeys - keys = self._get_key_dict(MasterKey, - '(&(objectClass=ipk11SecretKey)(|(ipk11UnWrap=TRUE)(!(ipk11UnWrap=*)))(ipk11Label=dnssec-master))') + keys = self._get_key_dict( + MasterKey, + '(&(objectClass=ipk11SecretKey)' + '(|(ipk11UnWrap=TRUE)(!(ipk11UnWrap=*)))' + '(ipk11Label=dnssec-master))' + ) for key in keys.values(): prefix = 'dnssec-master' - assert key['ipk11label'] == prefix, \ - 'secret key dn="%s" ipk11id=0x%s ipk11label="%s" with ' \ - 'ipk11UnWrap = TRUE does not have "%s" key label' % ( - key.entry.dn, - str_hexlify(key['ipk11id']), - str(key['ipk11label']), - prefix + if key['ipk11label'] != prefix: + raise ValueError( + "secret key dn='%s' ipk11id=0x%s ipk11label='%s' with " + "ipk11UnWrap = TRUE does not have '%s' key label'" + % (key.entry.dn, str_hexlify(key['ipk11id']), + str(key['ipk11label']), prefix) ) self.cache_masterkeys = keys |