diff options
Diffstat (limited to 'base/common/python')
| -rw-r--r-- | base/common/python/pki/crypto.py | 51 | ||||
| -rw-r--r-- | base/common/python/pki/key.py | 56 |
2 files changed, 95 insertions, 12 deletions
diff --git a/base/common/python/pki/crypto.py b/base/common/python/pki/crypto.py index b767abd2e..0891acd68 100644 --- a/base/common/python/pki/crypto.py +++ b/base/common/python/pki/crypto.py @@ -34,10 +34,21 @@ from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import ( Cipher, algorithms, modes ) +from cryptography.hazmat.primitives import keywrap from cryptography.hazmat.primitives import padding from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15 import cryptography.x509 +# encryption algorithms OIDs +DES_EDE3_CBC_OID = "{1 2 840 113549 3 7}" +AES_128_CBC_OID = "{2 16 840 1 101 3 4 1 2}" + +# Wrap Algorithm names as defined by JSS. +WRAP_AES_CBC_PAD = "AES/CBC/PKCS5Padding" +WRAP_AES_KEY_WRAP = "AES KeyWrap" +WRAP_AES_KEY_WRAP_PAD = "AES KeyWrap/Padding" +WRAP_DES3_CBC_PAD = "DES3/CBC/Pad" + class CryptoProvider(six.with_metaclass(abc.ABCMeta, object)): """ @@ -96,7 +107,11 @@ class CryptoProvider(six.with_metaclass(abc.ABCMeta, object)): DES3 key. """ - # abc.abstractmethod + @abc.abstractmethod + def key_unwrap(self, mechanism, data, wrapping_key, nonce_iv): + """ Unwrap data that has been key wrapped using AES KeyWrap """ + + @abc.abstractmethod def get_cert(self, cert_nick): """ Get the certificate for the specified cert_nick. """ @@ -302,6 +317,18 @@ class NSSCryptoProvider(CryptoProvider): public_key = wrapping_cert.subject_public_key_info.public_key return nss.pub_wrap_sym_key(mechanism, public_key, data) + def key_unwrap(self, mechanism, data, wrapping_key, nonce_iv): + """ + :param mechanism Key wrapping mechanism + :param data: Data to be unwrapped + :param wrapping_key: Wrapping Key + :param nonce_iv Nonce data + :return: Unwrapped data + + Return unwrapped data for data wrapped using AES KeyWrap + """ + raise NotImplementedError() + def get_cert(self, cert_nick): """ :param cert_nick Nickname for the certificate to be returned @@ -461,6 +488,28 @@ class CryptographyCryptoProvider(CryptoProvider): PKCS1v15() ) + def key_unwrap(self, mechanism, data, wrapping_key, nonce_iv): + """ + :param mechanism key wrapping mechanism + :param data: data to unwrap + :param wrapping_key: AES key used to wrap data + :param nonce_iv Nonce data + :return: unwrapped data + + Unwrap the encrypted data which has been wrapped using a + KeyWrap mechanism. + """ + if mechanism == WRAP_AES_CBC_PAD or mechanism == WRAP_DES3_CBC_PAD: + return self.symmetric_unwrap( + data, + wrapping_key, + nonce_iv=nonce_iv) + + if mechanism == WRAP_AES_KEY_WRAP: + return keywrap.aes_key_unwrap(wrapping_key, data, self.backend) + + raise ValueError("Unsupported key wrap algorithm: " + mechanism) + def get_cert(self, cert_nick): """ :param cert_nick Nickname for the certificate to be returned. diff --git a/base/common/python/pki/key.py b/base/common/python/pki/key.py index 6c5641a0c..e782d54c0 100644 --- a/base/common/python/pki/key.py +++ b/base/common/python/pki/key.py @@ -33,6 +33,7 @@ from six import iteritems from six.moves.urllib.parse import quote # pylint: disable=F0401,E0611 import pki +import pki.crypto import pki.encoder as encoder from pki.info import Version import pki.util @@ -459,10 +460,6 @@ class KeyClient(object): RSA_ALGORITHM = "RSA" DSA_ALGORITHM = "DSA" - # default session key wrapping algorithm - DES_EDE3_CBC_OID = "{1 2 840 113549 3 7}" - AES_128_CBC_OID = "{2 16 840 1 101 3 4 1 2}" - def __init__(self, connection, crypto, transport_cert_nick=None, info_client=None): """ Constructor """ @@ -481,6 +478,7 @@ class KeyClient(object): self.info_client = info_client self.encrypt_alg_oid = None + self.wrap_name = None self.set_crypto_algorithms() def set_transport_cert(self, transport_cert_nick): @@ -502,9 +500,14 @@ class KeyClient(object): # set keyset related constants needed in KeyClient if keyset_id == 0: - self.encrypt_alg_oid = self.DES_EDE3_CBC_OID + self.encrypt_alg_oid = pki.crypto.DES_EDE3_CBC_OID + self.wrap_name = pki.crypto.WRAP_DES3_CBC_PAD else: - self.encrypt_alg_oid = self.AES_128_CBC_OID + self.encrypt_alg_oid = pki.crypto.AES_128_CBC_OID + # Note: AES_KEY_WRAP_PAD is not yet supported by + # python cryptography. Therefore we will default + # to AES_CBC_PAD instead + self.wrap_name = pki.crypto.WRAP_AES_CBC_PAD def get_client_keyset(self): # get client keyset @@ -847,7 +850,7 @@ class KeyClient(object): raise TypeError('Missing wrapped session key') if not algorithm_oid: - algorithm_oid = KeyClient.AES_128_CBC_OID + algorithm_oid = pki.crypto.AES_128_CBC_OID # algorithm_oid = KeyClient.DES_EDE3_CBC_OID if not nonce_iv: @@ -1015,16 +1018,47 @@ class KeyClient(object): request_id=request_id, trans_wrapped_session_key=base64.b64encode( trans_wrapped_session_key), - payload_encryption_oid=self.encrypt_alg_oid + payload_encryption_oid=self.encrypt_alg_oid, + payload_wrapping_name=self.wrap_name ) key = self.retrieve_key_data(request) if not key_provided and key.encrypted_data is not None: - key.data = self.crypto.symmetric_unwrap( + self.process_returned_key(key, session_key) + return key + + @pki.handle_exceptions() + def process_returned_key(self, key, session_key): + """ + Decrypt the returned key and place in key.data + + The data will either by encrypted using an encryption algorithm - + in which case, the key data will contain an encryption algorithm OID, + or it will be key wrapped - in which case, the key data will contain + a key wrap mechanism name. + + Only one of these should be present. If we are talking to an older + server, and none is present, we will assume encryption. + """ + if key.wrap_algorithm is not None: + if key.encrypt_algorithm_oid is not None: + raise ValueError( + "Both encryptOID and wrapping name have been set " + + "in server response" + ) + # do key unwrapping here + key.data = self.crypto.key_unwrap( + key.wrap_algorithm, key.encrypted_data, session_key, - nonce_iv=key.nonce_data) - return key + key.nonce_data) + return + + # do decryption + key.data = self.crypto.symmetric_unwrap( + key.encrypted_data, + session_key, + nonce_iv=key.nonce_data) @pki.handle_exceptions() def retrieve_key_by_passphrase(self, key_id=None, request_id=None, |
