# Copyright (C) 2015 IPA Project Contributors, see COPYING for license from __future__ import print_function, absolute_import import os import sys from custodia.plugin import CSStore from ipaplatform.paths import paths from ipaplatform.constants import constants from ipapython import ipautil class UnknownKeyName(Exception): pass class DBMAPHandler: dbtype = None def __init__(self, config, dbmap, nickname): dbtype = dbmap.get('type') if dbtype is None or dbtype != self.dbtype: raise ValueError( "Invalid type '{}', expected '{}'".format( dbtype, self.dbtype ) ) self.config = config self.dbmap = dbmap self.nickname = nickname def export_key(self): raise NotImplementedError def import_key(self, value): raise NotImplementedError class DBMAPCommandHandler(DBMAPHandler): def __init__(self, config, dbmap, nickname): super().__init__(config, dbmap, nickname) self.runas = dbmap.get('runas') self.command = os.path.join( paths.IPA_CUSTODIA_HANDLER, dbmap['command'] ) def run_handler(self, extra_args=(), stdin=None): """Run handler script to export / import key material """ args = [self.command] args.extend(extra_args) kwargs = dict( runas=self.runas, encoding='utf-8', ) if stdin: args.extend(['--import', '-']) kwargs.update(stdin=stdin) else: args.extend(['--export', '-']) kwargs.update(capture_output=True) result = ipautil.run(args, **kwargs) if stdin is None: return result.output else: return None def log_error(error): print(error, file=sys.stderr) class NSSWrappedCertDB(DBMAPCommandHandler): """ Store that extracts private keys from an NSSDB, wrapped with the private key of the primary CA. """ dbtype = 'NSSDB' def export_key(self): return self.run_handler(['--nickname', self.nickname]) class NSSCertDB(DBMAPCommandHandler): dbtype = 'NSSDB' def export_key(self): return self.run_handler(['--nickname', self.nickname]) def import_key(self, value): return self.run_handler( ['--nickname', self.nickname], stdin=value ) # Exfiltrate the DM password Hash so it can be set in replica's and this # way let a replica be install without knowing the DM password and yet # still keep the DM password synchronized across replicas class DMLDAP(DBMAPCommandHandler): dbtype = 'DMLDAP' def __init__(self, config, dbmap, nickname): super().__init__(config, dbmap, nickname) if nickname != 'DMHash': raise UnknownKeyName("Unknown Key Named '%s'" % nickname) def export_key(self): return self.run_handler() def import_key(self, value): self.run_handler(stdin=value) class PEMFileHandler(DBMAPCommandHandler): dbtype = 'PEM' def export_key(self): return self.run_handler() def import_key(self, value): return self.run_handler(stdin=value) NAME_DB_MAP = { 'ca': { 'type': 'NSSDB', 'handler': NSSCertDB, 'command': 'ipa-custodia-pki-tomcat', 'runas': constants.PKI_USER, }, 'ca_wrapped': { 'type': 'NSSDB', 'handler': NSSWrappedCertDB, 'command': 'ipa-custodia-pki-tomcat-wrapped', 'runas': constants.PKI_USER, }, 'ra': { 'type': 'PEM', 'handler': PEMFileHandler, 'command': 'ipa-custodia-ra-agent', 'runas': None, # import needs root permission to write to directory }, 'dm': { 'type': 'DMLDAP', 'handler': DMLDAP, 'command': 'ipa-custodia-dmldap', 'runas': None, # root } } class IPASecStore(CSStore): def __init__(self, config=None): self.config = config def _get_handler(self, key): path = key.split('/', 3) if len(path) != 3 or path[0] != 'keys': raise ValueError('Invalid name') if path[1] not in NAME_DB_MAP: raise UnknownKeyName("Unknown DB named '%s'" % path[1]) dbmap = NAME_DB_MAP[path[1]] return dbmap['handler'](self.config, dbmap, path[2]) def get(self, key): try: key_handler = self._get_handler(key) value = key_handler.export_key() except Exception as e: # pylint: disable=broad-except log_error('Error retrieving key "%s": %s' % (key, str(e))) value = None return value def set(self, key, value, replace=False): try: key_handler = self._get_handler(key) key_handler.import_key(value) except Exception as e: # pylint: disable=broad-except log_error('Error storing key "%s": %s' % (key, str(e))) def list(self, keyfilter=None): raise NotImplementedError def cut(self, key): raise NotImplementedError def span(self, key): raise NotImplementedError # backwards compatibility with FreeIPA 4.3 and 4.4. iSecStore = IPASecStore