summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEndi S. Dewata <edewata@redhat.com>2015-11-12 00:23:26 +0100
committerMatthew Harmsen <mharmsen@pki.usersys.redhat.com>2016-02-22 20:18:10 -0700
commit0fb33918773529206879c665211019e0ecb26d48 (patch)
treeb5d3472d91e7a1a971f1bc81a82e9eee3ceda715
parent134d62bd45389979af8d06002b348599b8757b74 (diff)
downloadpki-0fb33918773529206879c665211019e0ecb26d48.tar.gz
pki-0fb33918773529206879c665211019e0ecb26d48.tar.xz
pki-0fb33918773529206879c665211019e0ecb26d48.zip
Added pki-server subsystem-cert-export command.
A new command has been added to export a system certificate, the CSR, and the key. This command can be used to migrate a system certificate into another instance. https://fedorahosted.org/pki/ticket/456 (cherry picked from commit 9dce4a497f7c977a3c453972706eeb325bd33275)
-rw-r--r--base/common/python/pki/nss.py336
-rw-r--r--base/server/python/pki/server/__init__.py6
-rw-r--r--base/server/python/pki/server/cli/subsystem.py126
3 files changed, 468 insertions, 0 deletions
diff --git a/base/common/python/pki/nss.py b/base/common/python/pki/nss.py
new file mode 100644
index 000000000..f36b771f8
--- /dev/null
+++ b/base/common/python/pki/nss.py
@@ -0,0 +1,336 @@
+#!/usr/bin/python
+# Authors:
+# Endi S. Dewata <edewata@redhat.com>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+#
+# Copyright (C) 2015 Red Hat, Inc.
+# All rights reserved.
+#
+
+import base64
+import os
+import shutil
+import subprocess
+import tempfile
+
+
+CSR_HEADER = '-----BEGIN NEW CERTIFICATE REQUEST-----'
+CSR_FOOTER = '-----END NEW CERTIFICATE REQUEST-----'
+
+CERT_HEADER = '-----BEGIN CERTIFICATE-----'
+CERT_FOOTER = '-----END CERTIFICATE-----'
+
+
+def convert_data(data, input_format, output_format, header=None, footer=None):
+
+ if input_format == 'base64' and output_format == 'pem':
+
+ # split a single line into multiple lines
+ lines = [data[i:i+64] for i in range(0, len(data), 64)]
+ return '%s\n%s\n%s\n' % (header, '\n'.join(lines), footer)
+
+ if input_format == 'pem' and output_format == 'base64':
+
+ # join multiple lines into a single line
+ lines = []
+ for line in data.splitlines():
+ line = line.rstrip('\r\n')
+ if line == header:
+ continue
+ if line == footer:
+ continue
+ lines.append(line)
+
+ return ''.join(lines)
+
+ raise Exception('Unable to convert data from %s to %s' % (input_format, output_format))
+
+def convert_csr(csr_data, input_format, output_format):
+
+ return convert_data(csr_data, input_format, output_format, CSR_HEADER, CSR_FOOTER)
+
+def convert_cert(cert_data, input_format, output_format):
+
+ return convert_data(cert_data, input_format, output_format, CERT_HEADER, CERT_FOOTER)
+
+
+class NSSDatabase(object):
+
+ def __init__(self, directory, password=None, password_file=None):
+ self.directory = directory
+
+ self.tmpdir = tempfile.mkdtemp()
+
+ if password:
+ self.password_file = os.path.join(self.tmpdir, 'password.txt')
+ with open(self.password_file, 'w') as f:
+ f.write(password)
+
+ elif password_file:
+ self.password_file = password_file
+
+ else:
+ raise Exception('Missing NSS database password')
+
+ def close(self):
+ shutil.rmtree(self.tmpdir)
+
+ def add_cert(self,
+ nickname, cert_file,
+ trust_attributes='u,u,u'):
+
+ subprocess.check_call([
+ 'certutil',
+ '-A',
+ '-d', self.directory,
+ '-n', nickname,
+ '-i', cert_file,
+ '-t', trust_attributes
+ ])
+
+ def modify_cert(self,
+ nickname,
+ trust_attributes='u,u,u'):
+
+ subprocess.check_call([
+ 'certutil',
+ '-M',
+ '-d', self.directory,
+ '-n', nickname,
+ '-t', trust_attributes
+ ])
+
+ def create_noise(self, noise_file, size=2048):
+
+ subprocess.check_call([
+ 'openssl',
+ 'rand',
+ '-out', noise_file,
+ str(size)
+ ])
+
+ def create_request(self,
+ subject_dn,
+ noise_file,
+ request_file):
+
+ tmpdir = tempfile.mkdtemp()
+
+ try:
+ binary_request_file = os.path.join(tmpdir, 'request.bin')
+ b64_request_file = os.path.join(tmpdir, 'request.b64')
+
+ # generate binary request
+ subprocess.check_call([
+ 'certutil',
+ '-R',
+ '-d', self.directory,
+ '-f', self.password_file,
+ '-s', subject_dn,
+ '-z', noise_file,
+ '-o', binary_request_file
+ ])
+
+ # encode binary request in base-64
+ subprocess.check_call([
+ 'BtoA', binary_request_file, b64_request_file])
+
+ # read base-64 request
+ with open(b64_request_file, 'r') as f:
+ b64_request = f.read()
+
+ # add header and footer
+ with open(request_file, 'w') as f:
+ f.write('-----BEGIN NEW CERTIFICATE REQUEST-----\n')
+ f.write(b64_request)
+ f.write('-----END NEW CERTIFICATE REQUEST-----\n')
+
+ finally:
+ shutil.rmtree(tmpdir)
+
+ def create_self_signed_ca_cert(self,
+ subject_dn,
+ request_file,
+ cert_file,
+ serial='1',
+ validity=240):
+
+ p = subprocess.Popen([
+ 'certutil',
+ '-C',
+ '-x',
+ '-d', self.directory,
+ '-f', self.password_file,
+ '-c', subject_dn,
+ '-a',
+ '-i', request_file,
+ '-o', cert_file,
+ '-m', serial,
+ '-v', str(validity),
+ '--keyUsage', 'digitalSignature,nonRepudiation,certSigning,crlSigning,critical',
+ '-2',
+ '-3',
+ '--extSKID',
+ '--extAIA'
+ ], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+
+ keystroke = ''
+
+ # Is this a CA certificate [y/N]?
+ keystroke += 'y\n'
+
+ # Enter the path length constraint, enter to skip [<0 for unlimited path]:
+ keystroke += '\n'
+
+ # Is this a critical extension [y/N]?
+ keystroke += 'y\n'
+
+ # Enter value for the authKeyID extension [y/N]?
+ keystroke += 'y\n'
+
+ # TODO: generate SHA1 ID (see APolicyRule.formSHA1KeyId())
+ # Enter value for the key identifier fields,enter to omit:
+ keystroke += '2d:7e:83:37:75:5a:fd:0e:8d:52:a3:70:16:93:36:b8:4a:d6:84:9f\n'
+
+ # Select one of the following general name type:
+ keystroke += '0\n'
+
+ # Enter value for the authCertSerial field, enter to omit:
+ keystroke += '\n'
+
+ # Is this a critical extension [y/N]?
+ keystroke += '\n'
+
+ # TODO: generate SHA1 ID (see APolicyRule.formSHA1KeyId())
+ # Adding Subject Key ID extension.
+ # Enter value for the key identifier fields,enter to omit:
+ keystroke += '2d:7e:83:37:75:5a:fd:0e:8d:52:a3:70:16:93:36:b8:4a:d6:84:9f\n'
+
+ # Is this a critical extension [y/N]?
+ keystroke += '\n'
+
+ # Enter access method type for Authority Information Access extension:
+ keystroke += '2\n'
+
+ # Select one of the following general name type:
+ keystroke += '7\n'
+
+ # TODO: replace with actual hostname name and port number
+ # Enter data:
+ keystroke += 'http://server.example.com:8080/ca/ocsp\n'
+
+ # Select one of the following general name type:
+ keystroke += '0\n'
+
+ # Add another location to the Authority Information Access extension [y/N]
+ keystroke += '\n'
+
+ # Is this a critical extension [y/N]?
+ keystroke += '\n'
+
+ p.communicate(keystroke)
+
+ rc = p.wait()
+
+ if rc:
+ raise Exception('Failed to generate self-signed CA certificate. RC: %d' + rc)
+
+ def get_cert(self, nickname, output_format='pem'):
+
+ if output_format == 'pem':
+ output_format_option = '-a'
+
+ elif output_format == 'base64':
+ output_format_option = '-r'
+
+ else:
+ raise Exception('Unsupported output format: %s' % output_format)
+
+ cert_data = subprocess.check_output([
+ 'certutil',
+ '-L',
+ '-d', self.directory,
+ '-n', nickname,
+ output_format_option
+ ])
+
+ if output_format == 'base64':
+ cert_data = base64.b64encode(cert_data)
+
+ return cert_data
+
+ def remove_cert(self, nickname):
+
+ subprocess.check_call([
+ 'certutil',
+ '-D',
+ '-d', self.directory,
+ '-n', nickname
+ ])
+
+ def import_pkcs12(self, pkcs12_file, pkcs12_password=None, pkcs12_password_file=None):
+
+ tmpdir = tempfile.mkdtemp()
+
+ try:
+ if pkcs12_password:
+ password_file = os.path.join(tmpdir, 'password.txt')
+ with open(password_file, 'w') as f:
+ f.write(pkcs12_password)
+
+ elif pkcs12_password_file:
+ password_file = pkcs12_password_file
+
+ else:
+ raise Exception('Missing PKCS #12 password')
+
+ subprocess.check_call([
+ 'pk12util',
+ '-d', self.directory,
+ '-k', self.password_file,
+ '-i', pkcs12_file,
+ '-w', password_file
+ ])
+
+ finally:
+ shutil.rmtree(tmpdir)
+
+ def export_pkcs12(self, pkcs12_file, nickname, pkcs12_password=None, pkcs12_password_file=None):
+
+ tmpdir = tempfile.mkdtemp()
+
+ try:
+ if pkcs12_password:
+ password_file = os.path.join(tmpdir, 'password.txt')
+ with open(password_file, 'w') as f:
+ f.write(pkcs12_password)
+
+ elif pkcs12_password_file:
+ password_file = pkcs12_password_file
+
+ else:
+ raise Exception('Missing PKCS #12 password')
+
+ subprocess.check_call([
+ 'pk12util',
+ '-d', self.directory,
+ '-k', self.password_file,
+ '-o', pkcs12_file,
+ '-w', password_file,
+ '-n', nickname
+ ])
+
+ finally:
+ shutil.rmtree(tmpdir)
diff --git a/base/server/python/pki/server/__init__.py b/base/server/python/pki/server/__init__.py
index dbb18cd3d..0cc267826 100644
--- a/base/server/python/pki/server/__init__.py
+++ b/base/server/python/pki/server/__init__.py
@@ -33,6 +33,7 @@ import subprocess
import tempfile
import pki
+import pki.nss
INSTANCE_BASE_DIR = '/var/lib/pki'
REGISTRY_DIR = '/etc/sysconfig/pki'
@@ -303,6 +304,11 @@ class PKIInstance(object):
return password
+ def open_nssdb(self):
+ return pki.nss.NSSDatabase(
+ directory=self.nssdb_dir,
+ password=self.get_password('internal'))
+
def get_subsystem(self, name):
for subsystem in self.subsystems:
if name == subsystem.name:
diff --git a/base/server/python/pki/server/cli/subsystem.py b/base/server/python/pki/server/cli/subsystem.py
index cd0e8566e..cb27adc1d 100644
--- a/base/server/python/pki/server/cli/subsystem.py
+++ b/base/server/python/pki/server/cli/subsystem.py
@@ -23,11 +23,13 @@ from __future__ import absolute_import
from __future__ import print_function
import base64
import getopt
+import getpass
import nss.nss as nss
import string
import sys
import pki.cli
+import pki.nss
import pki.server
@@ -294,6 +296,7 @@ class SubsystemCertCLI(pki.cli.CLI):
self.add_module(SubsystemCertFindCLI())
self.add_module(SubsystemCertShowCLI())
+ self.add_module(SubsystemCertExportCLI())
self.add_module(SubsystemCertUpdateCLI())
@staticmethod
@@ -438,6 +441,129 @@ class SubsystemCertShowCLI(pki.cli.CLI):
SubsystemCertCLI.print_subsystem_cert(subsystem_cert)
+class SubsystemCertExportCLI(pki.cli.CLI):
+
+ def __init__(self):
+ super(SubsystemCertExportCLI, self).__init__(
+ 'export', 'Export subsystem certificate')
+
+ def usage(self):
+ print('Usage: pki-server subsystem-cert-export [OPTIONS] <subsystem ID> <cert ID>')
+ print()
+ print(' -i, --instance <instance ID> Instance ID (default: pki-tomcat).')
+ print(' --cert-file <path> Output file to store the exported certificate in PEM format.')
+ print(' --csr-file <path> Output file to store the exported CSR in PEM format.')
+ print(' --pkcs12-file <path> Output file to store the exported certificate and key in PKCS #12 format.')
+ print(' --pkcs12-password <password> Password for the PKCS #12 file.')
+ print(' --pkcs12-password-file <path> Input file containing the password for the PKCS #12 file.')
+ print(' -v, --verbose Run in verbose mode.')
+ print(' --help Show help message.')
+ print()
+
+ def execute(self, argv):
+
+ try:
+ opts, args = getopt.gnu_getopt(argv, 'i:v', [
+ 'instance=', 'cert-file=', 'csr-file=',
+ 'pkcs12-file=', 'pkcs12-password=', 'pkcs12-password-file=',
+ 'verbose', 'help'])
+
+ except getopt.GetoptError as e:
+ print('ERROR: ' + str(e))
+ self.usage()
+ sys.exit(1)
+
+ if len(args) < 1:
+ print('ERROR: missing subsystem ID')
+ self.usage()
+ sys.exit(1)
+
+ if len(args) < 2:
+ print('ERROR: missing cert ID')
+ self.usage()
+ sys.exit(1)
+
+ subsystem_name = args[0]
+ cert_id = args[1]
+ instance_name = 'pki-tomcat'
+ cert_file = None
+ csr_file = None
+ pkcs12_file = None
+ pkcs12_password = None
+ pkcs12_password_file = None
+
+ for o, a in opts:
+ if o in ('-i', '--instance'):
+ instance_name = a
+
+ elif o == '--cert-file':
+ cert_file = a
+
+ elif o == '--csr-file':
+ csr_file = a
+
+ elif o == '--pkcs12-file':
+ pkcs12_file = a
+
+ elif o == '--pkcs12-password':
+ pkcs12_password = a
+
+ elif o == '--pkcs12-password-file':
+ pkcs12_password_file = a
+
+ elif o in ('-v', '--verbose'):
+ self.set_verbose(True)
+
+ elif o == '--help':
+ self.print_help()
+ sys.exit()
+
+ else:
+ print('ERROR: unknown option ' + o)
+ self.usage()
+ sys.exit(1)
+
+ if not cert_file and not csr_file and not pkcs12_file:
+ print('ERROR: missing output file')
+ self.usage()
+ sys.exit(1)
+
+ instance = pki.server.PKIInstance(instance_name)
+ instance.load()
+
+ subsystem = instance.get_subsystem(subsystem_name)
+ subsystem_cert = subsystem.get_subsystem_cert(cert_id)
+
+ if cert_file:
+
+ cert_data = pki.nss.convert_cert(subsystem_cert['data'], 'base64', 'pem')
+ with open(cert_file, 'w') as f:
+ f.write(cert_data)
+
+ if csr_file:
+
+ csr_data = pki.nss.convert_csr(subsystem_cert['request'], 'base64', 'pem')
+ with open(csr_file, 'w') as f:
+ f.write(csr_data)
+
+ if pkcs12_file:
+
+ if not pkcs12_password and not pkcs12_password_file:
+ pkcs12_password = getpass.getpass(prompt='Enter password for PKCS #12 file: ')
+
+ nssdb = instance.open_nssdb()
+ try:
+ nssdb.export_pkcs12(
+ pkcs12_file=pkcs12_file,
+ nickname=subsystem_cert['nickname'],
+ pkcs12_password=pkcs12_password,
+ pkcs12_password_file=pkcs12_password_file)
+ finally:
+ nssdb.close()
+
+ self.print_message('Exported %s certificate' % cert_id)
+
+
class SubsystemCertUpdateCLI(pki.cli.CLI):
def __init__(self):