summaryrefslogtreecommitdiffstats
path: root/base/common/python/pki/crypto.py
diff options
context:
space:
mode:
Diffstat (limited to 'base/common/python/pki/crypto.py')
-rw-r--r--base/common/python/pki/crypto.py206
1 files changed, 192 insertions, 14 deletions
diff --git a/base/common/python/pki/crypto.py b/base/common/python/pki/crypto.py
index 86fa16eb7..b767abd2e 100644
--- a/base/common/python/pki/crypto.py
+++ b/base/common/python/pki/crypto.py
@@ -23,13 +23,21 @@ Module containing crypto classes.
"""
from __future__ import absolute_import
import abc
-import nss.nss as nss
import os
-import six
import shutil
import subprocess
import tempfile
+import nss.nss as nss
+import six
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives.ciphers import (
+ Cipher, algorithms, modes
+)
+from cryptography.hazmat.primitives import padding
+from cryptography.hazmat.primitives.asymmetric.padding import PKCS1v15
+import cryptography.x509
+
class CryptoProvider(six.with_metaclass(abc.ABCMeta, object)):
"""
@@ -43,30 +51,32 @@ class CryptoProvider(six.with_metaclass(abc.ABCMeta, object)):
@abc.abstractmethod
def initialize(self):
""" Initialization code """
- pass
- @staticmethod
@abc.abstractmethod
- def generate_nonce_iv(mechanism):
+ def get_supported_algorithm_keyset(self):
+ """ returns highest supported algorithm keyset """
+
+ @abc.abstractmethod
+ def set_algorithm_keyset(self, level):
+ """ sets required keyset """
+
+ @abc.abstractmethod
+ def generate_nonce_iv(self, mechanism):
""" Create a random initialization vector """
- pass
@abc.abstractmethod
def generate_symmetric_key(self, mechanism=None, size=0):
""" Generate and return a symmetric key """
- pass
@abc.abstractmethod
def generate_session_key(self):
""" Generate a session key to be used for wrapping data to the DRM
This must return a 3DES 168 bit key """
- pass
@abc.abstractmethod
def symmetric_wrap(self, data, wrapping_key, mechanism=None,
nonce_iv=None):
""" encrypt data using a symmetric key (wrapping key)"""
- pass
@abc.abstractmethod
def symmetric_unwrap(self, data, wrapping_key, mechanism=None,
@@ -77,7 +87,6 @@ class CryptoProvider(six.with_metaclass(abc.ABCMeta, object)):
The mechanism is the type of key used to do the wrapping. It defaults
to a 56 bit DES3 key.
"""
- pass
@abc.abstractmethod
def asymmetric_wrap(self, data, wrapping_cert, mechanism=None):
@@ -86,12 +95,10 @@ class CryptoProvider(six.with_metaclass(abc.ABCMeta, object)):
The mechanism is the type of symmetric key, which defaults to a 56 bit
DES3 key.
"""
- pass
# abc.abstractmethod
def get_cert(self, cert_nick):
""" Get the certificate for the specified cert_nick. """
- pass
class NSSCryptoProvider(CryptoProvider):
@@ -152,6 +159,18 @@ class NSSCryptoProvider(CryptoProvider):
"""
nss.nss_init(self.certdb_dir)
+ def get_supported_algorithm_keyset(self):
+ """ returns highest supported algorithm keyset """
+ return 0
+
+ def set_algorithm_keyset(self, level):
+ """ sets required keyset """
+ if level > 0:
+ raise Exception("Invalid keyset")
+
+ # basically, do what we have always done, no need to set anything
+ # special here.
+
def import_cert(self, cert_nick, cert, trust=',,'):
""" Import a certificate into the nss database
"""
@@ -170,8 +189,7 @@ class NSSCryptoProvider(CryptoProvider):
'-i', cert_file.name]
subprocess.check_call(command)
- @staticmethod
- def generate_nonce_iv(mechanism=nss.CKM_DES3_CBC_PAD):
+ def generate_nonce_iv(self, mechanism=nss.CKM_DES3_CBC_PAD):
""" Create a random initialization vector """
iv_length = nss.get_iv_length(mechanism)
if iv_length > 0:
@@ -237,6 +255,8 @@ class NSSCryptoProvider(CryptoProvider):
"""
:param data Data to be wrapped
:param wrapping_key Symmetric key to wrap data
+ :param mechanism Mechanism to user when wrapping
+ :param nonce_iv Nonce to use when wrapping
Wrap (encrypt) data using the supplied symmetric key
"""
@@ -255,6 +275,7 @@ class NSSCryptoProvider(CryptoProvider):
"""
:param data Data to be unwrapped
:param wrapping_key Symmetric key to unwrap data
+ :param mechanism Mechanism to use when wrapping
:param nonce_iv iv data
Unwrap (decrypt) data using the supplied symmetric key
@@ -288,3 +309,160 @@ class NSSCryptoProvider(CryptoProvider):
Searches NSS database and returns SecItem object for this certificate.
"""
return nss.find_cert_from_nickname(cert_nick)
+
+
+class CryptographyCryptoProvider(CryptoProvider):
+ """
+ Class that defines python-cryptography implementation of CryptoProvider.
+ Requires a PEM file containing the agent cert to be initialized.
+
+ Note that all inputs and outputs are unencoded.
+ """
+
+ def __init__(self, transport_cert_nick, transport_cert,
+ backend=default_backend()):
+ """ Initialize python-cryptography
+ """
+ super(CryptographyCryptoProvider, self).__init__()
+ self.certs = {}
+
+ if not isinstance(transport_cert, cryptography.x509.Certificate):
+ # it's a file name
+ with open(transport_cert, 'r') as f:
+ transport_pem = f.read()
+ transport_cert = cryptography.x509.load_pem_x509_certificate(
+ transport_pem,
+ backend)
+
+ self.certs[transport_cert_nick] = transport_cert
+
+ # default to AES
+ self.encrypt_alg = algorithms.AES
+ self.encrypt_mode = modes.CBC
+ self.encrypt_size = 128
+ self.backend = backend
+
+ def initialize(self):
+ """
+ Any operations here that need to be performed before crypto
+ operations.
+ """
+ pass
+
+ def get_supported_algorithm_keyset(self):
+ """ returns highest supported algorithm keyset """
+ return 1
+
+ def set_algorithm_keyset(self, level):
+ """ sets required keyset """
+ if level > 1:
+ raise ValueError("Invalid keyset")
+ elif level == 1:
+ self.encrypt_alg = algorithms.AES
+ self.encrypt_mode = modes.CBC
+ self.encrypt_size = 128
+ elif level == 0:
+ self.encrypt_alg = algorithms.TripleDES
+ self.encrypt_mode = modes.CBC
+ self.encrypt_size = 168
+
+ def generate_nonce_iv(self, mechanism='AES'):
+ """ Create a random initialization vector """
+ return os.urandom(self.encrypt_alg.block_size // 8)
+
+ def generate_symmetric_key(self, mechanism=None, size=0):
+ """ Returns a symmetric key.
+ """
+ if mechanism is None:
+ size = self.encrypt_size // 8
+ return os.urandom(size)
+
+ def generate_session_key(self):
+ """ Returns a session key to be used when wrapping secrets for the DRM.
+ """
+ return self.generate_symmetric_key()
+
+ def symmetric_wrap(self, data, wrapping_key, mechanism=None,
+ nonce_iv=None):
+ """
+ :param data Data to be wrapped
+ :param wrapping_key Symmetric key to wrap data
+ :param mechanism Mechanism to use for wrapping key
+ :param nonce_iv Nonce for initialization vector
+
+ Wrap (encrypt) data using the supplied symmetric key
+ """
+ # TODO(alee) Not sure yet how to handle non-default mechanisms
+ # For now, lets just ignore them
+
+ if wrapping_key is None:
+ raise ValueError("Wrapping key must be provided")
+
+ if self.encrypt_mode.name == "CBC":
+ padder = padding.PKCS7(self.encrypt_alg.block_size).padder()
+ padded_data = padder.update(data) + padder.finalize()
+ data = padded_data
+ else:
+ raise ValueError('Only CBC mode is currently supported')
+
+ cipher = Cipher(self.encrypt_alg(wrapping_key),
+ self.encrypt_mode(nonce_iv),
+ backend=self.backend)
+
+ encryptor = cipher.encryptor()
+ ct = encryptor.update(data) + encryptor.finalize()
+ return ct
+
+ def symmetric_unwrap(self, data, wrapping_key,
+ mechanism=None, nonce_iv=None):
+ """
+ :param data Data to be unwrapped
+ :param wrapping_key Symmetric key to unwrap data
+ :param mechanism Mechanism to use when unwrapping
+ :param nonce_iv iv data
+
+ Unwrap (decrypt) data using the supplied symmetric key
+ """
+
+ # TODO(alee) As above, no idea what to do with mechanism
+ # ignoring for now.
+
+ if wrapping_key is None:
+ raise ValueError("Wrapping key must be provided")
+
+ cipher = Cipher(self.encrypt_alg(wrapping_key),
+ self.encrypt_mode(nonce_iv),
+ backend=self.backend)
+
+ decryptor = cipher.decryptor()
+ unwrapped = decryptor.update(data) + decryptor.finalize()
+
+ if self.encrypt_mode.name == 'CBC':
+ unpadder = padding.PKCS7(self.encrypt_alg.block_size).unpadder()
+ unpadded = unpadder.update(unwrapped) + unpadder.finalize()
+ unwrapped = unpadded
+ else:
+ raise ValueError('Only CBC mode is currently supported')
+
+ return unwrapped
+
+ def asymmetric_wrap(self, data, wrapping_cert,
+ mechanism=None):
+ """
+ :param data Data to be wrapped
+ :param wrapping_cert Public key to wrap data
+ :param mechanism algorithm of symmetric key to be wrapped
+
+ Wrap (encrypt) data using the supplied asymmetric key
+ """
+ public_key = wrapping_cert.public_key()
+ return public_key.encrypt(
+ data,
+ PKCS1v15()
+ )
+
+ def get_cert(self, cert_nick):
+ """
+ :param cert_nick Nickname for the certificate to be returned.
+ """
+ return self.certs[cert_nick]