diff options
Diffstat (limited to 'base/server/python/pki/server/cli/subsystem.py')
-rw-r--r-- | base/server/python/pki/server/cli/subsystem.py | 265 |
1 files changed, 263 insertions, 2 deletions
diff --git a/base/server/python/pki/server/cli/subsystem.py b/base/server/python/pki/server/cli/subsystem.py index a9857ba5f..1381a6162 100644 --- a/base/server/python/pki/server/cli/subsystem.py +++ b/base/server/python/pki/server/cli/subsystem.py @@ -1,6 +1,7 @@ # Authors: # Endi S. Dewata <edewata@redhat.com> # Abhijeet Kasurde <akasurde@redhat.com> +# Dinesh Prasanth M K <dmoluguw@redhat.com> # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -26,7 +27,10 @@ import getpass import os import subprocess import sys -from tempfile import mkstemp +import random +import tempfile +import shutil +import re import pki.cli import pki.nssdb @@ -370,6 +374,7 @@ class SubsystemCertCLI(pki.cli.CLI): self.add_module(SubsystemCertExportCLI()) self.add_module(SubsystemCertUpdateCLI()) self.add_module(SubsystemCertValidateCLI()) + self.add_module(SubsystemCertRenewCLI()) @staticmethod def print_subsystem_cert(cert, show_all=False): @@ -1006,7 +1011,7 @@ class SubsystemCertValidateCLI(pki.cli.CLI): # get internal token password and store in temporary file passwd = instance.get_token_password() - pwfile_handle, pwfile_path = mkstemp() + pwfile_handle, pwfile_path = tempfile.mkstemp() os.write(pwfile_handle, passwd) os.close(pwfile_handle) @@ -1035,3 +1040,259 @@ class SubsystemCertValidateCLI(pki.cli.CLI): finally: os.unlink(pwfile_path) + + +class SubsystemCertRenewCLI(pki.cli.CLI): + def __init__(self): + super(SubsystemCertRenewCLI, self).__init__( + 'renew', 'Renew subsystem certificate') + + def usage(self): + print('Usage: pki-server subsystem-cert-renew [OPTIONS] <subsystem ID> <cert ID>') + print() + print(' -i, --instance <instance ID> Instance ID (default: pki-tomcat).') + print(' -v, --verbose Run in verbose mode.') + print(' --help Show help message.') + print(' --temp Create temporary certificate.') + print(' --serial <number> Provide serial number for temp certificate.') + print() + + def execute(self, argv): + try: + opts, args = getopt.gnu_getopt(argv, 'i:v', [ + 'instance=', + 'verbose', + 'help', 'temp', 'serial=']) + + except getopt.GetoptError as e: + print('ERROR: ' + str(e)) + self.usage() + sys.exit(1) + + instance_name = 'pki-tomcat' + is_permanent_cert = True + serial = None + + for o, a in opts: + if o in ('-i', '--instance'): + instance_name = a + + elif o in ('-v', '--verbose'): + self.set_verbose(True) + + elif o == '--help': + self.usage() + sys.exit() + + elif o == '--temp': + is_permanent_cert = False + + elif o == '--serial': + serial = a + + else: + self.print_message('ERROR: unknown option ' + o) + self.usage() + sys.exit(1) + + if len(args) < 1: + print('ERROR: missing subsystem ID') + self.usage() + sys.exit(1) + + if len(args) < 2: + print('ERROR: missing cert ID') + self.usage() + sys.exit(1) + + subsystem_name = args[0] + cert_id = args[1] + + instance = pki.server.PKIInstance(instance_name) + + if not instance.is_valid(): + print('ERROR: Invalid instance %s.' % instance_name) + sys.exit(1) + + # Load the instance. Default: pki-tomcat + instance.load() + + # Get the subsystem - Eg: ca, kra, tps, tks + subsystem = instance.get_subsystem(subsystem_name) + if not subsystem: + print('ERROR: No %s subsystem in instance ' + '%s.' % (subsystem_name, instance_name)) + sys.exit(1) + + cert = subsystem.get_subsystem_cert(cert_id) + + nssdb = instance.open_nssdb() + tmpdir = tempfile.mkdtemp() + + try: + new_cert_file = os.path.join(tmpdir, cert_id + '.crt') + + # Check if the request is for permanent certificate creation + if is_permanent_cert: + # Serial number for permanent certificate must be auto-generated + if serial: + raise Exception('--serial not allowed for permanent cert') + # Fixme: Get the serial from LDAP DB (Method 3a) + else: + if not serial: + # Fixme: Get the highest serial number from NSS DB and add 1 (Method 2b) + # If admin doesn't provide a serial number, generate one + serial = str(random.randint( + int(subsystem.config.get('dbs.beginSerialNumber', '1')), + int(subsystem.config.get('dbs.endSerialNumber', '10000000')))) + + if cert_id == 'sslserver': + self.renew_ssl_cert(subsystem=subsystem, is_permanent_cert=is_permanent_cert, tmpdir=tmpdir, + new_cert_file=new_cert_file, nssdb=nssdb, serial=serial) + + elif cert_id == 'ca_ocsp_signing': + self.renew_ocsp_cert(is_permanent_cert=is_permanent_cert) + + elif cert_id == 'ca_audit_signing': + self.renew_audit_cert(is_permanent_cert=is_permanent_cert) + + elif cert_id == 'subsystem': + self.renew_subsystem_cert(is_permanent_cert=is_permanent_cert) + + else: + # renewal not yet supported + raise Exception('Renewal for %s not yet supported.' % cert_id) + + # Import cert into NSS db + if self.verbose: + print('Removing old %s certificate from NSS database.' % cert_id) + nssdb.remove_cert(cert['nickname']) + + if self.verbose: + print('Adding new %s certificate into NSS database.' % cert_id) + nssdb.add_cert( + nickname=cert['nickname'], + cert_file=new_cert_file) + + # Update CS.cfg with the new certificate + if self.verbose: + print('Updating CS.cfg') + data = nssdb.get_cert( + nickname=cert['nickname'], + output_format='base64') + cert['data'] = data + subsystem.update_subsystem_cert(cert) + subsystem.save() + + finally: + nssdb.close() + shutil.rmtree(tmpdir) + + @staticmethod + def setup_temp_renewal(subsystem, cert_id, tmpdir): + + csr_file = os.path.join(tmpdir, cert_id + '.csr') + ca_cert_file = os.path.join(tmpdir, 'ca_certificate.crt') + + # Export the CSR for the cert + cert_request = subsystem.get_subsystem_cert(cert_id).get('request', None) + if cert_request is None: + print("ERROR: Unable to find certificate request for %s" % cert_id) + sys.exit(1) + + csr_data = pki.nssdb.convert_csr(cert_request, 'base64', 'pem') + with open(csr_file, 'w') as f: + f.write(csr_data) + + # Extract SKI + # 1. Get the CA certificate + # 2. Then get the SKI from it + ca_signing_cert = subsystem.get_subsystem_cert('signing') + ca_cert_data = ca_signing_cert.get('data', None) + if ca_cert_data is None: + print("ERROR: Unable to find certificate data for CA signing certificate.") + sys.exit(1) + + ca_cert = pki.nssdb.convert_cert(ca_cert_data, 'base64', 'pem') + with open(ca_cert_file, 'w') as f: + f.write(ca_cert) + + ca_cert_retrieve_cmd = [ + 'openssl', + 'x509', + '-in', ca_cert_file, + '-noout', + '-text' + ] + + ca_cert_details = subprocess.check_output(ca_cert_retrieve_cmd) + aki = re.search(r'Subject Key Identifier.*\n.*?(.*?)\n', ca_cert_details).group(1) + + # Add 0x to represent this is a Hex + aki = '0x' + aki.strip().replace(':', '') + + return ca_signing_cert, aki, csr_file + + def renew_ssl_cert(self, subsystem, serial, tmpdir, is_permanent_cert, new_cert_file, nssdb): + if self.verbose: + print('Creating SSL server certificate.') + + if is_permanent_cert: + # TODO: Online renewal + print('SSL cert online renewal not yet supported.') + else: + # Generate temp SSL Certificate signed by CA + + ca_signing_cert, aki, csr_file = self.setup_temp_renewal( + subsystem=subsystem, tmpdir=tmpdir, cert_id='sslserver') + + # --keyUsage + key_usage_ext = { + 'digitalSignature': True, + 'nonRepudiation': True, + 'keyEncipherment': True, + 'dataEncipherment': True, + 'critical': True + } + + # -3 + aki_ext = { + 'auth_key_id': aki + } + + # --extKeyUsage + ext_key_usage_ext = { + 'serverAuth': True + } + + rc = nssdb.create_cert( + issuer=ca_signing_cert['nickname'], + request_file=csr_file, + cert_file=new_cert_file, + serial=serial, + key_usage_ext=key_usage_ext, + aki_ext=aki_ext, + ext_key_usage_ext=ext_key_usage_ext) + if rc: + raise Exception('Failed to generate CA-signed temp SSL certificate. RC: %d' % rc) + + def renew_ocsp_cert(self, is_permanent_cert): + if is_permanent_cert: + # TODO: Online renewal + raise Exception('OCSP cert online renewal not yet supported.') + else: + raise Exception('Temp certificate for OCSP is not supported.') + + def renew_subsystem_cert(self, is_permanent_cert): + if is_permanent_cert: + # TODO: Online renewal + raise Exception('Subsystem cert online renewal not yet supported.') + else: + raise Exception('Temp certificate for subsystem is not supported.') + + def renew_audit_cert(self, is_permanent_cert): + if is_permanent_cert: + # TODO: Online renewal + raise Exception('Audit signing cert online renewal not yet supported.') + else: + raise Exception('Temp certificate for audit signing is not supported.') |