diff options
Diffstat (limited to 'base/server/python/pki/server/cli/subsystem.py')
-rw-r--r-- | base/server/python/pki/server/cli/subsystem.py | 118 |
1 files changed, 118 insertions, 0 deletions
diff --git a/base/server/python/pki/server/cli/subsystem.py b/base/server/python/pki/server/cli/subsystem.py index 03d48f926..6d60468a6 100644 --- a/base/server/python/pki/server/cli/subsystem.py +++ b/base/server/python/pki/server/cli/subsystem.py @@ -24,8 +24,11 @@ import base64 import getopt import getpass import nss.nss as nss +import os import string +import subprocess import sys +from tempfile import mkstemp import pki.cli import pki.nssdb @@ -299,6 +302,7 @@ class SubsystemCertCLI(pki.cli.CLI): self.add_module(SubsystemCertShowCLI()) self.add_module(SubsystemCertExportCLI()) self.add_module(SubsystemCertUpdateCLI()) + self.add_module(SubsystemCertValidateCLI()) @staticmethod def print_subsystem_cert(cert, show_all=False): @@ -713,3 +717,117 @@ class SubsystemCertUpdateCLI(pki.cli.CLI): self.print_message('Updated "%s" subsystem certificate' % cert_id) SubsystemCertCLI.print_subsystem_cert(subsystem_cert) + + +class SubsystemCertValidateCLI(pki.cli.CLI): + + def __init__(self): + super(SubsystemCertValidateCLI, self).__init__( + 'validate', 'Validate subsystem certificates') + + def usage(self): + print('Usage: pki-server subsystem-cert-validate [OPTIONS] <subsystem ID> [<cert_id>]') + print() + print(' -i, --instance <instance ID> Instance ID (default: pki-tomcat).') + print(' -v, --verbose Run in verbose mode.') + print(' --help Show help message.') + print() + + def execute(self, argv): + + try: + opts, args = getopt.gnu_getopt(argv, 'i:v', [ + 'instance=', + 'verbose', 'help']) + + except getopt.GetoptError as e: + print('ERROR: ' + str(e)) + self.usage() + sys.exit(1) + + if len(args) < 1: + print('ERROR: missing subsystem ID') + self.usage() + sys.exit(1) + + subsystem_name = args[0] + + if len(args) >=2: + cert_id = args[1] + else: + cert_id = None + + instance_name = 'pki-tomcat' + + for o, a in opts: + if o in ('-i', '--instance'): + instance_name = a + + elif o in ('-v', '--verbose'): + self.set_verbose(True) + + elif o == '--help': + self.print_help() + sys.exit() + + else: + self.print_message('ERROR: unknown option ' + o) + self.usage() + sys.exit(1) + + instance = pki.server.PKIInstance(instance_name) + instance.load() + + subsystem = instance.get_subsystem(subsystem_name) + + if cert_id is not None: + certs = [subsystem.get_subsystem_cert(cert_id)] + else: + certs = subsystem.find_system_certs() + + certs_valid = True + for cert in certs: + token = cert['token'] + + # get token password and store in temporary file + if token == 'Internal Key Storage Token': + passwd = instance.get_password('internal') + else: + passwd = instance.get_password("hardware-%s" % token) + + pwfile_handle, pwfile_path = mkstemp() + os.write(pwfile_handle, passwd) + os.close(pwfile_handle) + + cmd = ['pki', '-d', instance.nssdb_dir, + '-W', pwfile_path ] + + if token != 'Internal Key Storage Token': + cmd.extend(['--token', token]) + + cmd.extend( + ['client-cert-validate', + cert['nickname'], + '--certusage', cert['certusage']] + ) + + try: + subprocess.check_output(cmd, stderr=subprocess.STDOUT) + self.print_message("Valid certificate : %s" %cert['nickname']) + except subprocess.CalledProcessError as e: + certs_valid = False + if e.returncode == 1: + self.print_message("Invalid certificate: %s" + % cert['nickname']) + else: + self.print_message("Error in validating certificate: %s" + % cert['nickname']) + self.print_message(e.output) + finally: + os.unlink(pwfile_path) + + if certs_valid: + sys.exit(0) + else: + sys.exit(1) + |