diff options
Diffstat (limited to 'base/common/python/pki/crypto.py')
-rw-r--r-- | base/common/python/pki/crypto.py | 268 |
1 files changed, 268 insertions, 0 deletions
diff --git a/base/common/python/pki/crypto.py b/base/common/python/pki/crypto.py new file mode 100644 index 000000000..174e681b8 --- /dev/null +++ b/base/common/python/pki/crypto.py @@ -0,0 +1,268 @@ +#!/usr/bin/python +# Authors: +# Ade Lee <alee@redhat.com> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# +# Copyright (C) 2013 Red Hat, Inc. +# All rights reserved. +# +""" +Module containing crypto classes. +""" +import abc +import exceptions +import nss.nss as nss +import os +import shutil +import subprocess +import tempfile + + +class CryptoProvider(object): + """ + Abstract class containing methods to do cryptographic operations. + """ + __metaclass__ = abc.ABCMeta + + def __init__(self): + """ Constructor """ + pass + + @abc.abstractmethod + def initialize(self): + """ Initialization code """ + pass + + @staticmethod + @abc.abstractmethod + def generate_nonce_iv(mechanism): + """ Create a random initialization vector """ + pass + + @abc.abstractmethod + def generate_symmetric_key(self, mechanism=None, size=0): + """ Generate and return a symmetric key """ + pass + + @abc.abstractmethod + def generate_session_key(self): + """ Generate a session key to be used for wrapping data to the DRM + This must return a 3DES 168 bit key """ + pass + + @abc.abstractmethod + def symmetric_wrap(self, data, wrapping_key, mechanism=None, nonce_iv=None): + """ encrypt data using a symmetric key (wrapping key)""" + pass + + @abc.abstractmethod + def symmetric_unwrap(self, data, wrapping_key, mechanism=None, + nonce_iv=None): + """ decrypt data originally encrypted with symmetric key (wrapping key) + + We expect the data and nonce_iv values to be base64 encoded. + The mechanism is the type of key used to do the wrapping. It defaults + to a 56 bit DES3 key. + """ + pass + + @abc.abstractmethod + def asymmetric_wrap(self, data, wrapping_cert, mechanism=None): + """ encrypt a symmetric key with the public key of a transport cert. + + The mechanism is the type of symmetric key, which defaults to a 56 bit + DES3 key. + """ + pass + + #abc.abstractmethod + def get_cert(self, cert_nick): + """ Get the certificate for the specified cert_nick. """ + pass + + +class NSSCryptoProvider(CryptoProvider): + """ + Class that defines NSS implementation of CryptoProvider. + Requires an NSS database to have been set up and initialized. + + Note that all inputs and outputs are unencoded. + """ + + @staticmethod + def setup_database(db_dir, password, over_write=False): + """ Create an NSS database """ + if os.path.exists(db_dir): + if not over_write: + raise exceptions.IOError("Directory already exists.") + if os.path.isdir(db_dir): + shutil.rmtree(db_dir) + else: + os.remove(db_dir) + os.makedirs(db_dir) + + home = os.path.expanduser("~") + with tempfile.NamedTemporaryFile(dir=home) as pwd_file: + pwd_file.write(password) + pwd_file.flush() + command = ['certutil', '-N', '-d', db_dir, '-f', pwd_file.name] + subprocess.check_call(command) + + def __init__(self, certdb_dir, certdb_password): + """ Initialize nss and nss related parameters + + This method expects a NSS database to have already been created at + certdb_dir with password certdb_password. + """ + CryptoProvider.__init__(self) + self.certdb_dir = certdb_dir + self.certdb_password = certdb_password + self.nonce_iv = "e4:bb:3b:d3:c3:71:2e:58" + + def initialize(self): + """ + Initialize the nss db. Must be done before any crypto operations + """ + nss.nss_init(self.certdb_dir) + + def import_cert(self, cert_nick, cert, trust): + """ Import a certificate into the nss database + """ + # certutil -A -d db_dir -n cert_nick -t trust -i cert_file -a + with tempfile.NamedTemporaryFile() as cert_file: + cert_file.write(cert) + cert_file.flush() + command = ['certutil', '-A', '-d', self.certdb_dir, + '-n', cert_nick, '-t', trust, + '-i', cert_file.name] + subprocess.check_call(command) + + @staticmethod + def generate_nonce_iv(mechanism=nss.CKM_DES3_CBC_PAD): + """ Create a random initialization vector """ + iv_length = nss.get_iv_length(mechanism) + if iv_length > 0: + iv_data = nss.generate_random(iv_length) + return iv_data + else: + return None + + @classmethod + def setup_contexts(cls, mechanism, sym_key, nonce_iv): + """ Set up contexts to do wrapping/unwrapping by symmetric keys. """ + # Get a PK11 slot based on the cipher + slot = nss.get_best_slot(mechanism) + + if sym_key is None: + sym_key = slot.key_gen(mechanism, + None, + slot.get_best_key_length(mechanism)) + + # If initialization vector was supplied use it, otherwise set it to None + if nonce_iv: + iv_si = nss.SecItem(nonce_iv) + iv_param = nss.param_from_iv(mechanism, iv_si) + else: + iv_data = cls.generate_nonce_iv(mechanism) + if iv_data is not None: + iv_si = nss.SecItem(iv_data) + iv_param = nss.param_from_iv(mechanism, iv_si) + else: + iv_param = None + + # Create an encoding context + encoding_ctx = nss.create_context_by_sym_key(mechanism, nss.CKA_ENCRYPT, + sym_key, iv_param) + + # Create a decoding context + decoding_ctx = nss.create_context_by_sym_key(mechanism, nss.CKA_DECRYPT, + sym_key, iv_param) + + return encoding_ctx, decoding_ctx + + def generate_symmetric_key(self, mechanism=nss.CKM_DES3_CBC_PAD, size=0): + """ Returns a symmetric key. + + Note that for fixed length keys, this length should be 0. If no length + is provided, then the function will either use 0 (for fixed length keys) + or the maximum available length for that algorithm and the token. + """ + slot = nss.get_best_slot(mechanism) + if size == 0: + size = slot.get_best_key_length(mechanism) + return slot.key_gen(mechanism, None, size) + + def generate_session_key(self): + """ Returns a session key to be used when wrapping secrets for the DRM + This will return a 168 bit 3DES key. + """ + return self.generate_symmetric_key(mechanism=nss.CKM_DES3_CBC_PAD) + + def symmetric_wrap(self, data, wrapping_key, mechanism=nss.CKM_DES3_CBC_PAD, + nonce_iv=None): + """ + :param data Data to be wrapped + :param wrapping_key Symmetric key to wrap data + + Wrap (encrypt) data using the supplied symmetric key + """ + if nonce_iv is None: + nonce_iv = nss.read_hex(self.nonce_iv) + + encoding_ctx, _decoding_ctx = self.setup_contexts(mechanism, + wrapping_key, + nonce_iv) + wrapped_data = encoding_ctx.cipher_op(data) +\ + encoding_ctx.digest_final() + return wrapped_data + + def symmetric_unwrap(self, data, wrapping_key, + mechanism=nss.CKM_DES3_CBC_PAD, nonce_iv=None): + """ + :param data Data to be unwrapped + :param wrapping_key Symmetric key to unwrap data + :param nonce_iv iv data + + Unwrap (decrypt) data using the supplied symmetric key + """ + if nonce_iv is None: + nonce_iv = nss.read_hex(self.nonce_iv) + + _encoding_ctx, decoding_ctx = self.setup_contexts(mechanism, + wrapping_key, + nonce_iv) + unwrapped_data = decoding_ctx.cipher_op(data) \ + + decoding_ctx.digest_final() + return unwrapped_data + + def asymmetric_wrap(self, data, wrapping_cert, + mechanism=nss.CKM_DES3_CBC_PAD): + """ + :param data Data to be wrapped + :param wrapping_cert Public key to wrap data + :param mechanism algorithm of symmetric key to be wrapped + + Wrap (encrypt) data using the supplied asymmetric key + """ + public_key = wrapping_cert.subject_public_key_info.public_key + return nss.pub_wrap_sym_key(mechanism, public_key, data) + + def get_cert(self, cert_nick): + """ + :param cert_nick Nickname for the certificate to be returned + + Searches NSS database and returns SecItem object for this certificate. + """ + return nss.find_cert_from_nickname(cert_nick) |