diff options
Diffstat (limited to 'base/common/python/pki/crypto.py')
-rw-r--r-- | base/common/python/pki/crypto.py | 206 |
1 files changed, 192 insertions, 14 deletions
diff --git a/base/common/python/pki/crypto.py b/base/common/python/pki/crypto.py index 86fa16eb7..b767abd2e 100644 --- a/base/common/python/pki/crypto.py +++ b/base/common/python/pki/crypto.py @@ -23,13 +23,21 @@ Module containing crypto classes. """ from __future__ import absolute_import import abc -import nss.nss as nss import os -import six import shutil import subprocess import tempfile +import nss.nss as nss +import six +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.ciphers import ( + Cipher, algorithms, modes +) +from cryptography.hazmat.primitives import padding +from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15 +import cryptography.x509 + class CryptoProvider(six.with_metaclass(abc.ABCMeta, object)): """ @@ -43,30 +51,32 @@ class CryptoProvider(six.with_metaclass(abc.ABCMeta, object)): @abc.abstractmethod def initialize(self): """ Initialization code """ - pass - @staticmethod @abc.abstractmethod - def generate_nonce_iv(mechanism): + def get_supported_algorithm_keyset(self): + """ returns highest supported algorithm keyset """ + + @abc.abstractmethod + def set_algorithm_keyset(self, level): + """ sets required keyset """ + + @abc.abstractmethod + def generate_nonce_iv(self, mechanism): """ Create a random initialization vector """ - pass @abc.abstractmethod def generate_symmetric_key(self, mechanism=None, size=0): """ Generate and return a symmetric key """ - pass @abc.abstractmethod def generate_session_key(self): """ Generate a session key to be used for wrapping data to the DRM This must return a 3DES 168 bit key """ - pass @abc.abstractmethod def symmetric_wrap(self, data, wrapping_key, mechanism=None, nonce_iv=None): """ encrypt data using a symmetric key (wrapping key)""" - pass @abc.abstractmethod def symmetric_unwrap(self, data, wrapping_key, mechanism=None, @@ -77,7 +87,6 @@ class CryptoProvider(six.with_metaclass(abc.ABCMeta, object)): The mechanism is the type of key used to do the wrapping. It defaults to a 56 bit DES3 key. """ - pass @abc.abstractmethod def asymmetric_wrap(self, data, wrapping_cert, mechanism=None): @@ -86,12 +95,10 @@ class CryptoProvider(six.with_metaclass(abc.ABCMeta, object)): The mechanism is the type of symmetric key, which defaults to a 56 bit DES3 key. """ - pass # abc.abstractmethod def get_cert(self, cert_nick): """ Get the certificate for the specified cert_nick. """ - pass class NSSCryptoProvider(CryptoProvider): @@ -152,6 +159,18 @@ class NSSCryptoProvider(CryptoProvider): """ nss.nss_init(self.certdb_dir) + def get_supported_algorithm_keyset(self): + """ returns highest supported algorithm keyset """ + return 0 + + def set_algorithm_keyset(self, level): + """ sets required keyset """ + if level > 0: + raise Exception("Invalid keyset") + + # basically, do what we have always done, no need to set anything + # special here. + def import_cert(self, cert_nick, cert, trust=',,'): """ Import a certificate into the nss database """ @@ -170,8 +189,7 @@ class NSSCryptoProvider(CryptoProvider): '-i', cert_file.name] subprocess.check_call(command) - @staticmethod - def generate_nonce_iv(mechanism=nss.CKM_DES3_CBC_PAD): + def generate_nonce_iv(self, mechanism=nss.CKM_DES3_CBC_PAD): """ Create a random initialization vector """ iv_length = nss.get_iv_length(mechanism) if iv_length > 0: @@ -237,6 +255,8 @@ class NSSCryptoProvider(CryptoProvider): """ :param data Data to be wrapped :param wrapping_key Symmetric key to wrap data + :param mechanism Mechanism to user when wrapping + :param nonce_iv Nonce to use when wrapping Wrap (encrypt) data using the supplied symmetric key """ @@ -255,6 +275,7 @@ class NSSCryptoProvider(CryptoProvider): """ :param data Data to be unwrapped :param wrapping_key Symmetric key to unwrap data + :param mechanism Mechanism to use when wrapping :param nonce_iv iv data Unwrap (decrypt) data using the supplied symmetric key @@ -288,3 +309,160 @@ class NSSCryptoProvider(CryptoProvider): Searches NSS database and returns SecItem object for this certificate. """ return nss.find_cert_from_nickname(cert_nick) + + +class CryptographyCryptoProvider(CryptoProvider): + """ + Class that defines python-cryptography implementation of CryptoProvider. + Requires a PEM file containing the agent cert to be initialized. + + Note that all inputs and outputs are unencoded. + """ + + def __init__(self, transport_cert_nick, transport_cert, + backend=default_backend()): + """ Initialize python-cryptography + """ + super(CryptographyCryptoProvider, self).__init__() + self.certs = {} + + if not isinstance(transport_cert, cryptography.x509.Certificate): + # it's a file name + with open(transport_cert, 'r') as f: + transport_pem = f.read() + transport_cert = cryptography.x509.load_pem_x509_certificate( + transport_pem, + backend) + + self.certs[transport_cert_nick] = transport_cert + + # default to AES + self.encrypt_alg = algorithms.AES + self.encrypt_mode = modes.CBC + self.encrypt_size = 128 + self.backend = backend + + def initialize(self): + """ + Any operations here that need to be performed before crypto + operations. + """ + pass + + def get_supported_algorithm_keyset(self): + """ returns highest supported algorithm keyset """ + return 1 + + def set_algorithm_keyset(self, level): + """ sets required keyset """ + if level > 1: + raise ValueError("Invalid keyset") + elif level == 1: + self.encrypt_alg = algorithms.AES + self.encrypt_mode = modes.CBC + self.encrypt_size = 128 + elif level == 0: + self.encrypt_alg = algorithms.TripleDES + self.encrypt_mode = modes.CBC + self.encrypt_size = 168 + + def generate_nonce_iv(self, mechanism='AES'): + """ Create a random initialization vector """ + return os.urandom(self.encrypt_alg.block_size // 8) + + def generate_symmetric_key(self, mechanism=None, size=0): + """ Returns a symmetric key. + """ + if mechanism is None: + size = self.encrypt_size // 8 + return os.urandom(size) + + def generate_session_key(self): + """ Returns a session key to be used when wrapping secrets for the DRM. + """ + return self.generate_symmetric_key() + + def symmetric_wrap(self, data, wrapping_key, mechanism=None, + nonce_iv=None): + """ + :param data Data to be wrapped + :param wrapping_key Symmetric key to wrap data + :param mechanism Mechanism to use for wrapping key + :param nonce_iv Nonce for initialization vector + + Wrap (encrypt) data using the supplied symmetric key + """ + # TODO(alee) Not sure yet how to handle non-default mechanisms + # For now, lets just ignore them + + if wrapping_key is None: + raise ValueError("Wrapping key must be provided") + + if self.encrypt_mode.name == "CBC": + padder = padding.PKCS7(self.encrypt_alg.block_size).padder() + padded_data = padder.update(data) + padder.finalize() + data = padded_data + else: + raise ValueError('Only CBC mode is currently supported') + + cipher = Cipher(self.encrypt_alg(wrapping_key), + self.encrypt_mode(nonce_iv), + backend=self.backend) + + encryptor = cipher.encryptor() + ct = encryptor.update(data) + encryptor.finalize() + return ct + + def symmetric_unwrap(self, data, wrapping_key, + mechanism=None, nonce_iv=None): + """ + :param data Data to be unwrapped + :param wrapping_key Symmetric key to unwrap data + :param mechanism Mechanism to use when unwrapping + :param nonce_iv iv data + + Unwrap (decrypt) data using the supplied symmetric key + """ + + # TODO(alee) As above, no idea what to do with mechanism + # ignoring for now. + + if wrapping_key is None: + raise ValueError("Wrapping key must be provided") + + cipher = Cipher(self.encrypt_alg(wrapping_key), + self.encrypt_mode(nonce_iv), + backend=self.backend) + + decryptor = cipher.decryptor() + unwrapped = decryptor.update(data) + decryptor.finalize() + + if self.encrypt_mode.name == 'CBC': + unpadder = padding.PKCS7(self.encrypt_alg.block_size).unpadder() + unpadded = unpadder.update(unwrapped) + unpadder.finalize() + unwrapped = unpadded + else: + raise ValueError('Only CBC mode is currently supported') + + return unwrapped + + def asymmetric_wrap(self, data, wrapping_cert, + mechanism=None): + """ + :param data Data to be wrapped + :param wrapping_cert Public key to wrap data + :param mechanism algorithm of symmetric key to be wrapped + + Wrap (encrypt) data using the supplied asymmetric key + """ + public_key = wrapping_cert.public_key() + return public_key.encrypt( + data, + PKCS1v15() + ) + + def get_cert(self, cert_nick): + """ + :param cert_nick Nickname for the certificate to be returned. + """ + return self.certs[cert_nick] |