# Authors: # Endi S. Dewata # # Copyright (C) 2015 Red Hat # see file 'COPYING' for use and warranty information # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . from __future__ import print_function import base64 import collections import errno import getpass import io import json import os import sys import tempfile from cryptography.fernet import Fernet, InvalidToken from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.padding import PKCS7 from cryptography.hazmat.primitives.serialization import ( load_pem_public_key, load_pem_private_key) from ipaclient.frontend import MethodOverride from ipalib import x509 from ipalib.constants import USER_CACHE_PATH from ipalib.frontend import Local, Method, Object from ipalib.util import classproperty from ipalib import api, errors from ipalib import Bytes, Flag, Str from ipalib.plugable import Registry from ipalib import _ from ipapython.dnsutil import DNSName from ipapython.ipa_log_manager import log_mgr logger = log_mgr.get_logger(__name__) def validated_read(argname, filename, mode='r', encoding=None): """Read file and catch errors IOError and UnicodeError (for text files) are turned into a ValidationError """ try: with io.open(filename, mode=mode, encoding=encoding) as f: data = f.read() except IOError as exc: raise errors.ValidationError( name=argname, error=_("Cannot read file '%(filename)s': %(exc)s") % { 'filename': filename, 'exc': exc.args[1] } ) except UnicodeError as exc: raise errors.ValidationError( name=argname, error=_("Cannot decode file '%(filename)s': %(exc)s") % { 'filename': filename, 'exc': exc } ) return data register = Registry() MAX_VAULT_DATA_SIZE = 2**20 # = 1 MB def get_new_password(): """ Gets new password from user and verify it. """ while True: password = getpass.getpass('New password: ').decode( sys.stdin.encoding) password2 = getpass.getpass('Verify password: ').decode( sys.stdin.encoding) if password == password2: return password print(' ** Passwords do not match! **') def get_existing_password(): """ Gets existing password from user. """ return getpass.getpass('Password: ').decode(sys.stdin.encoding) def generate_symmetric_key(password, salt): """ Generates symmetric key from password and salt. """ kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, backend=default_backend() ) return base64.b64encode(kdf.derive(password.encode('utf-8'))) def encrypt(data, symmetric_key=None, public_key=None): """ Encrypts data with symmetric key or public key. """ if symmetric_key: fernet = Fernet(symmetric_key) return fernet.encrypt(data) elif public_key: public_key_obj = load_pem_public_key( data=public_key, backend=default_backend() ) return public_key_obj.encrypt( data, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA1()), algorithm=hashes.SHA1(), label=None ) ) def decrypt(data, symmetric_key=None, private_key=None): """ Decrypts data with symmetric key or public key. """ if symmetric_key: try: fernet = Fernet(symmetric_key) return fernet.decrypt(data) except InvalidToken: raise errors.AuthenticationError( message=_('Invalid credentials')) elif private_key: try: private_key_obj = load_pem_private_key( data=private_key, password=None, backend=default_backend() ) return private_key_obj.decrypt( data, padding.OAEP( mgf=padding.MGF1(algorithm=hashes.SHA1()), algorithm=hashes.SHA1(), label=None ) ) except ValueError: raise errors.AuthenticationError( message=_('Invalid credentials')) @register(no_fail=True) class _fake_vault(Object): name = 'vault' @register(no_fail=True) class _fake_vault_add_internal(Method): name = 'vault_add_internal' NO_CLI = True @register() class vault_add(Local): __doc__ = _('Create a new vault.') takes_options = ( Str( 'password?', cli_name='password', doc=_('Vault password'), ), Str( # TODO: use File parameter 'password_file?', cli_name='password_file', doc=_('File containing the vault password'), ), Str( # TODO: use File parameter 'public_key_file?', cli_name='public_key_file', doc=_('File containing the vault public key'), ), ) @classmethod def __NO_CLI_getter(cls): return (api.Command.get_plugin('vault_add_internal') is _fake_vault_add_internal) NO_CLI = classproperty(__NO_CLI_getter) @property def api_version(self): return self.api.Command.vault_add_internal.api_version def get_args(self): for arg in self.api.Command.vault_add_internal.args(): yield arg for arg in super(vault_add, self).get_args(): yield arg def get_options(self): for option in self.api.Command.vault_add_internal.options(): if option.name not in ('ipavaultsalt', 'version'): yield option for option in super(vault_add, self).get_options(): yield option def get_output_params(self): for param in self.api.Command.vault_add_internal.output_params(): yield param for param in super(vault_add, self).get_output_params(): yield param def _iter_output(self): return self.api.Command.vault_add_internal.output() def forward(self, *args, **options): vault_type = options.get('ipavaulttype') if vault_type is None: internal_cmd = self.api.Command.vault_add_internal vault_type = internal_cmd.params.ipavaulttype.default password = options.get('password') password_file = options.get('password_file') public_key = options.get('ipavaultpublickey') public_key_file = options.get('public_key_file') # don't send these parameters to server if 'password' in options: del options['password'] if 'password_file' in options: del options['password_file'] if 'public_key_file' in options: del options['public_key_file'] if vault_type != u'symmetric' and (password or password_file): raise errors.MutuallyExclusiveError( reason=_('Password can be specified only for ' 'symmetric vault') ) if vault_type != u'asymmetric' and (public_key or public_key_file): raise errors.MutuallyExclusiveError( reason=_('Public key can be specified only for ' 'asymmetric vault') ) if self.api.env.in_server: backend = self.api.Backend.ldap2 else: backend = self.api.Backend.rpcclient if not backend.isconnected(): backend.connect() if vault_type == u'standard': pass elif vault_type == u'symmetric': # get password if password and password_file: raise errors.MutuallyExclusiveError( reason=_('Password specified multiple times')) elif password: pass elif password_file: password = validated_read('password-file', password_file, encoding='utf-8') password = password.rstrip('\n') else: password = get_new_password() # generate vault salt options['ipavaultsalt'] = os.urandom(16) elif vault_type == u'asymmetric': # get new vault public key if public_key and public_key_file: raise errors.MutuallyExclusiveError( reason=_('Public key specified multiple times')) elif public_key: pass elif public_key_file: public_key = validated_read('public-key-file', public_key_file, mode='rb') # store vault public key options['ipavaultpublickey'] = public_key else: raise errors.ValidationError( name='ipavaultpublickey', error=_('Missing vault public key')) # validate public key and prevent users from accidentally # sending a private key to the server. try: load_pem_public_key( data=public_key, backend=default_backend() ) except ValueError as e: raise errors.ValidationError( name='ipavaultpublickey', error=_('Invalid or unsupported vault public key: %s') % e, ) # create vault response = self.api.Command.vault_add_internal(*args, **options) # prepare parameters for archival opts = options.copy() if 'description' in opts: del opts['description'] if 'ipavaulttype' in opts: del opts['ipavaulttype'] if vault_type == u'symmetric': opts['password'] = password del opts['ipavaultsalt'] elif vault_type == u'asymmetric': del opts['ipavaultpublickey'] # archive blank data self.api.Command.vault_archive(*args, **opts) return response @register(no_fail=True) class _fake_vault_mod_internal(Method): name = 'vault_mod_internal' NO_CLI = True @register() class vault_mod(Local): __doc__ = _('Modify a vault.') takes_options = ( Flag( 'change_password?', doc=_('Change password'), ), Str( 'old_password?', cli_name='old_password', doc=_('Old vault password'), ), Str( # TODO: use File parameter 'old_password_file?', cli_name='old_password_file', doc=_('File containing the old vault password'), ), Str( 'new_password?', cli_name='new_password', doc=_('New vault password'), ), Str( # TODO: use File parameter 'new_password_file?', cli_name='new_password_file', doc=_('File containing the new vault password'), ), Bytes( 'private_key?', cli_name='private_key', doc=_('Old vault private key'), ), Str( # TODO: use File parameter 'private_key_file?', cli_name='private_key_file', doc=_('File containing the old vault private key'), ), Str( # TODO: use File parameter 'public_key_file?', cli_name='public_key_file', doc=_('File containing the new vault public key'), ), ) @classmethod def __NO_CLI_getter(cls): return (api.Command.get_plugin('vault_mod_internal') is _fake_vault_mod_internal) NO_CLI = classproperty(__NO_CLI_getter) @property def api_version(self): return self.api.Command.vault_mod_internal.api_version def get_args(self): for arg in self.api.Command.vault_mod_internal.args(): yield arg for arg in super(vault_mod, self).get_args(): yield arg def get_options(self): for option in self.api.Command.vault_mod_internal.options(): if option.name != 'version': yield option for option in super(vault_mod, self).get_options(): yield option def get_output_params(self): for param in self.api.Command.vault_mod_internal.output_params(): yield param for param in super(vault_mod, self).get_output_params(): yield param def _iter_output(self): return self.api.Command.vault_mod_internal.output() def forward(self, *args, **options): vault_type = options.pop('ipavaulttype', False) salt = options.pop('ipavaultsalt', False) change_password = options.pop('change_password', False) old_password = options.pop('old_password', None) old_password_file = options.pop('old_password_file', None) new_password = options.pop('new_password', None) new_password_file = options.pop('new_password_file', None) old_private_key = options.pop('private_key', None) old_private_key_file = options.pop('private_key_file', None) new_public_key = options.pop('ipavaultpublickey', None) new_public_key_file = options.pop('public_key_file', None) if self.api.env.in_server: backend = self.api.Backend.ldap2 else: backend = self.api.Backend.rpcclient if not backend.isconnected(): backend.connect() # determine the vault type based on parameters specified if vault_type: pass elif change_password or new_password or new_password_file or salt: vault_type = u'symmetric' elif new_public_key or new_public_key_file: vault_type = u'asymmetric' # if vault type is specified, retrieve existing secret if vault_type: opts = options.copy() opts.pop('description', None) opts['password'] = old_password opts['password_file'] = old_password_file opts['private_key'] = old_private_key opts['private_key_file'] = old_private_key_file response = self.api.Command.vault_retrieve(*args, **opts) data = response['result']['data'] opts = options.copy() # if vault type is specified, update crypto attributes if vault_type: opts['ipavaulttype'] = vault_type if vault_type == u'standard': opts['ipavaultsalt'] = None opts['ipavaultpublickey'] = None elif vault_type == u'symmetric': if salt: opts['ipavaultsalt'] = salt else: opts['ipavaultsalt'] = os.urandom(16) opts['ipavaultpublickey'] = None elif vault_type == u'asymmetric': # get new vault public key if new_public_key and new_public_key_file: raise errors.MutuallyExclusiveError( reason=_('New public key specified multiple times')) elif new_public_key: pass elif new_public_key_file: new_public_key = validated_read('public_key_file', new_public_key_file, mode='rb') else: raise errors.ValidationError( name='ipavaultpublickey', error=_('Missing new vault public key')) opts['ipavaultsalt'] = None opts['ipavaultpublickey'] = new_public_key response = self.api.Command.vault_mod_internal(*args, **opts) # if vault type is specified, rearchive existing secret if vault_type: opts = options.copy() opts.pop('description', None) opts['data'] = data opts['password'] = new_password opts['password_file'] = new_password_file opts['override_password'] = True self.api.Command.vault_archive(*args, **opts) return response class _TransportCertCache(collections.MutableMapping): def __init__(self): self._dirname = os.path.join( USER_CACHE_PATH, 'ipa', 'kra-transport-certs') self._transport_certs = {} def _get_filename(self, domain): basename = DNSName(domain).ToASCII() + '.pem' return os.path.join(self._dirname, basename) def __getitem__(self, domain): try: transport_cert = self._transport_certs[domain] except KeyError: transport_cert = None filename = self._get_filename(domain) try: try: transport_cert = x509.load_certificate_from_file(filename) except EnvironmentError as e: if e.errno != errno.ENOENT: raise except Exception: logger.warning("Failed to load %s: %s", filename, exc_info=True) if transport_cert is None: raise KeyError(domain) self._transport_certs[domain] = transport_cert return transport_cert def __setitem__(self, domain, transport_cert): filename = self._get_filename(domain) transport_cert_der = ( transport_cert.public_bytes(serialization.Encoding.DER)) try: try: os.makedirs(self._dirname) except EnvironmentError as e: if e.errno != errno.EEXIST: raise fd, tmpfilename = tempfile.mkstemp(dir=self._dirname) os.close(fd) x509.write_certificate(transport_cert_der, tmpfilename) os.rename(tmpfilename, filename) except Exception: logger.warning("Failed to save %s", filename, exc_info=True) self._transport_certs[domain] = transport_cert def __delitem__(self, domain): filename = self._get_filename(domain) try: os.unlink(filename) except EnvironmentError as e: if e.errno != errno.ENOENT: logger.warning("Failed to remove %s", filename, exc_info=True) del self._transport_certs[domain] def __len__(self): return len(self._transport_certs) def __iter__(self): return iter(self._transport_certs) _transport_cert_cache = _TransportCertCache() @register(override=True, no_fail=True) class vaultconfig_show(MethodOverride): def forward(self, *args, **options): file = options.get('transport_out') # don't send these parameters to server if 'transport_out' in options: del options['transport_out'] response = super(vaultconfig_show, self).forward(*args, **options) # cache transport certificate transport_cert = x509.load_certificate( response['result']['transport_cert'], x509.DER) _transport_cert_cache[self.api.env.domain] = transport_cert if file: with open(file, 'w') as f: f.write(response['result']['transport_cert']) return response class ModVaultData(Local): def _generate_session_key(self): key_length = max(algorithms.TripleDES.key_sizes) algo = algorithms.TripleDES(os.urandom(key_length // 8)) return algo def _do_internal(self, algo, transport_cert, raise_unexpected, *args, **options): public_key = transport_cert.public_key() # wrap session key with transport certificate wrapped_session_key = public_key.encrypt( algo.key, padding.PKCS1v15() ) options['session_key'] = wrapped_session_key name = self.name + '_internal' try: return self.api.Command[name](*args, **options) except errors.NotFound: raise except (errors.InternalError, errors.ExecutionError, errors.GenericError): _transport_cert_cache.pop(self.api.env.domain, None) if raise_unexpected: raise def internal(self, algo, *args, **options): """ Calls the internal counterpart of the command. """ domain = self.api.env.domain # try call with cached transport certificate transport_cert = _transport_cert_cache.get(domain) if transport_cert is not None: result = self._do_internal(algo, transport_cert, False, *args, **options) if result is not None: return result # retrieve and cache transport certificate self.api.Command.vaultconfig_show() transport_cert = _transport_cert_cache[domain] # call with the retrieved transport certificate return self._do_internal(algo, transport_cert, True, *args, **options) @register(no_fail=True) class _fake_vault_archive_internal(Method): name = 'vault_archive_internal' NO_CLI = True @register() class vault_archive(ModVaultData): __doc__ = _('Archive data into a vault.') takes_options = ( Bytes( 'data?', doc=_('Binary data to archive'), ), Str( # TODO: use File parameter 'in?', doc=_('File containing data to archive'), ), Str( 'password?', cli_name='password', doc=_('Vault password'), ), Str( # TODO: use File parameter 'password_file?', cli_name='password_file', doc=_('File containing the vault password'), ), Flag( 'override_password?', doc=_('Override existing password'), ), ) @classmethod def __NO_CLI_getter(cls): return (api.Command.get_plugin('vault_archive_internal') is _fake_vault_archive_internal) NO_CLI = classproperty(__NO_CLI_getter) @property def api_version(self): return self.api.Command.vault_archive_internal.api_version def get_args(self): for arg in self.api.Command.vault_archive_internal.args(): yield arg for arg in super(vault_archive, self).get_args(): yield arg def get_options(self): for option in self.api.Command.vault_archive_internal.options(): if option.name not in ('nonce', 'session_key', 'vault_data', 'version'): yield option for option in super(vault_archive, self).get_options(): yield option def get_output_params(self): for param in self.api.Command.vault_archive_internal.output_params(): yield param for param in super(vault_archive, self).get_output_params(): yield param def _iter_output(self): return self.api.Command.vault_archive_internal.output() def _wrap_data(self, algo, json_vault_data): """Encrypt data with wrapped session key and transport cert :param bytes algo: wrapping algorithm instance :param bytes json_vault_data: dumped vault data :return: """ nonce = os.urandom(algo.block_size // 8) # wrap vault_data with session key padder = PKCS7(algo.block_size).padder() padded_data = padder.update(json_vault_data) padded_data += padder.finalize() cipher = Cipher(algo, modes.CBC(nonce), backend=default_backend()) encryptor = cipher.encryptor() wrapped_vault_data = encryptor.update(padded_data) + encryptor.finalize() return nonce, wrapped_vault_data def forward(self, *args, **options): data = options.get('data') input_file = options.get('in') password = options.get('password') password_file = options.get('password_file') override_password = options.pop('override_password', False) # don't send these parameters to server if 'data' in options: del options['data'] if 'in' in options: del options['in'] if 'password' in options: del options['password'] if 'password_file' in options: del options['password_file'] # get data if data and input_file: raise errors.MutuallyExclusiveError( reason=_('Input data specified multiple times')) elif data: if len(data) > MAX_VAULT_DATA_SIZE: raise errors.ValidationError(name="data", error=_( "Size of data exceeds the limit. Current vault data size " "limit is %(limit)d B") % {'limit': MAX_VAULT_DATA_SIZE}) elif input_file: try: stat = os.stat(input_file) except OSError as exc: raise errors.ValidationError(name="in", error=_( "Cannot read file '%(filename)s': %(exc)s") % {'filename': input_file, 'exc': exc.args[1]}) if stat.st_size > MAX_VAULT_DATA_SIZE: raise errors.ValidationError(name="in", error=_( "Size of data exceeds the limit. Current vault data size " "limit is %(limit)d B") % {'limit': MAX_VAULT_DATA_SIZE}) data = validated_read('in', input_file, mode='rb') else: data = '' if self.api.env.in_server: backend = self.api.Backend.ldap2 else: backend = self.api.Backend.rpcclient if not backend.isconnected(): backend.connect() # retrieve vault info vault = self.api.Command.vault_show(*args, **options)['result'] vault_type = vault['ipavaulttype'][0] if vault_type == u'standard': encrypted_key = None elif vault_type == u'symmetric': # get password if password and password_file: raise errors.MutuallyExclusiveError( reason=_('Password specified multiple times')) elif password: pass elif password_file: password = validated_read('password-file', password_file, encoding='utf-8') password = password.rstrip('\n') else: if override_password: password = get_new_password() else: password = get_existing_password() if not override_password: # verify password by retrieving existing data opts = options.copy() opts['password'] = password try: self.api.Command.vault_retrieve(*args, **opts) except errors.NotFound: pass salt = vault['ipavaultsalt'][0] # generate encryption key from vault password encryption_key = generate_symmetric_key(password, salt) # encrypt data with encryption key data = encrypt(data, symmetric_key=encryption_key) encrypted_key = None elif vault_type == u'asymmetric': public_key = vault['ipavaultpublickey'][0].encode('utf-8') # generate encryption key encryption_key = base64.b64encode(os.urandom(32)) # encrypt data with encryption key data = encrypt(data, symmetric_key=encryption_key) # encrypt encryption key with public key encrypted_key = encrypt(encryption_key, public_key=public_key) else: raise errors.ValidationError( name='vault_type', error=_('Invalid vault type')) vault_data = { 'data': base64.b64encode(data).decode('utf-8') } if encrypted_key: vault_data[u'encrypted_key'] = base64.b64encode(encrypted_key)\ .decode('utf-8') json_vault_data = json.dumps(vault_data).encode('utf-8') # generate session key algo = self._generate_session_key() # wrap vault data nonce, wrapped_vault_data = self._wrap_data(algo, json_vault_data) options.update( nonce=nonce, vault_data=wrapped_vault_data ) return self.internal(algo, *args, **options) @register(no_fail=True) class _fake_vault_retrieve_internal(Method): name = 'vault_retrieve_internal' NO_CLI = True @register() class vault_retrieve(ModVaultData): __doc__ = _('Retrieve a data from a vault.') takes_options = ( Str( 'out?', doc=_('File to store retrieved data'), ), Str( 'password?', cli_name='password', doc=_('Vault password'), ), Str( # TODO: use File parameter 'password_file?', cli_name='password_file', doc=_('File containing the vault password'), ), Bytes( 'private_key?', cli_name='private_key', doc=_('Vault private key'), ), Str( # TODO: use File parameter 'private_key_file?', cli_name='private_key_file', doc=_('File containing the vault private key'), ), ) has_output_params = ( Bytes( 'data', label=_('Data'), ), ) @classmethod def __NO_CLI_getter(cls): return (api.Command.get_plugin('vault_retrieve_internal') is _fake_vault_retrieve_internal) NO_CLI = classproperty(__NO_CLI_getter) @property def api_version(self): return self.api.Command.vault_retrieve_internal.api_version def get_args(self): for arg in self.api.Command.vault_retrieve_internal.args(): yield arg for arg in super(vault_retrieve, self).get_args(): yield arg def get_options(self): for option in self.api.Command.vault_retrieve_internal.options(): if option.name not in ('session_key', 'version'): yield option for option in super(vault_retrieve, self).get_options(): yield option def get_output_params(self): for param in self.api.Command.vault_retrieve_internal.output_params(): yield param for param in super(vault_retrieve, self).get_output_params(): yield param def _iter_output(self): return self.api.Command.vault_retrieve_internal.output() def _unwrap_response(self, algo, nonce, vault_data): cipher = Cipher(algo, modes.CBC(nonce), backend=default_backend()) # decrypt decryptor = cipher.decryptor() padded_data = decryptor.update(vault_data) padded_data += decryptor.finalize() # remove padding unpadder = PKCS7(algo.block_size).unpadder() json_vault_data = unpadder.update(padded_data) json_vault_data += unpadder.finalize() # load JSON return json.loads(json_vault_data.decode('utf-8')) def forward(self, *args, **options): output_file = options.get('out') password = options.get('password') password_file = options.get('password_file') private_key = options.get('private_key') private_key_file = options.get('private_key_file') # don't send these parameters to server if 'out' in options: del options['out'] if 'password' in options: del options['password'] if 'password_file' in options: del options['password_file'] if 'private_key' in options: del options['private_key'] if 'private_key_file' in options: del options['private_key_file'] if self.api.env.in_server: backend = self.api.Backend.ldap2 else: backend = self.api.Backend.rpcclient if not backend.isconnected(): backend.connect() # retrieve vault info vault = self.api.Command.vault_show(*args, **options)['result'] vault_type = vault['ipavaulttype'][0] # generate session key algo = self._generate_session_key() # send retrieval request to server response = self.internal(algo, *args, **options) # unwrap data with session key vault_data = self._unwrap_response( algo, response['result']['nonce'], response['result']['vault_data'] ) del algo data = base64.b64decode(vault_data[u'data'].encode('utf-8')) encrypted_key = None if 'encrypted_key' in vault_data: encrypted_key = base64.b64decode(vault_data[u'encrypted_key'] .encode('utf-8')) if vault_type == u'standard': pass elif vault_type == u'symmetric': salt = vault['ipavaultsalt'][0] # get encryption key from vault password if password and password_file: raise errors.MutuallyExclusiveError( reason=_('Password specified multiple times')) elif password: pass elif password_file: password = validated_read('password-file', password_file, encoding='utf-8') password = password.rstrip('\n') else: password = get_existing_password() # generate encryption key from password encryption_key = generate_symmetric_key(password, salt) # decrypt data with encryption key data = decrypt(data, symmetric_key=encryption_key) elif vault_type == u'asymmetric': # get encryption key with vault private key if private_key and private_key_file: raise errors.MutuallyExclusiveError( reason=_('Private key specified multiple times')) elif private_key: pass elif private_key_file: private_key = validated_read('private-key-file', private_key_file, mode='rb') else: raise errors.ValidationError( name='private_key', error=_('Missing vault private key')) # decrypt encryption key with private key encryption_key = decrypt(encrypted_key, private_key=private_key) # decrypt data with encryption key data = decrypt(data, symmetric_key=encryption_key) else: raise errors.ValidationError( name='vault_type', error=_('Invalid vault type')) if output_file: with open(output_file, 'w') as f: f.write(data) else: response['result'] = {'data': data} return response