summaryrefslogtreecommitdiffstats
path: root/base/common/python/pki/nss.py
diff options
context:
space:
mode:
authorEndi S. Dewata <edewata@redhat.com>2015-11-07 00:09:19 +0100
committerMatthew Harmsen <mharmsen@pki.usersys.redhat.com>2016-02-22 20:19:30 -0700
commitbc0de424aa8c56d2278e41b7786ca202b7e64cc3 (patch)
tree35800e3d43bcdb58e7c561ab0a058674475aa7c7 /base/common/python/pki/nss.py
parent4a81377c26e68c48b78c90f2a61970373dd1a6fa (diff)
downloadpki-bc0de424aa8c56d2278e41b7786ca202b7e64cc3.tar.gz
pki-bc0de424aa8c56d2278e41b7786ca202b7e64cc3.tar.xz
pki-bc0de424aa8c56d2278e41b7786ca202b7e64cc3.zip
Added mechanism to import existing CA certificate.
The deployment procedure for external CA has been modified such that it generates the CA CSR before starting the server. This allows the same procedure to be used to import CA certificate from an existing server. It also removes the requirement to keep the server running while waiting to get the CSR signed by an external CA. https://fedorahosted.org/pki/ticket/456 (cherry picked from commit 20c985ae773b26f653cac6d22bd9d93923e18c8e)
Diffstat (limited to 'base/common/python/pki/nss.py')
-rw-r--r--base/common/python/pki/nss.py247
1 files changed, 220 insertions, 27 deletions
diff --git a/base/common/python/pki/nss.py b/base/common/python/pki/nss.py
index f36b771f8..196fe462f 100644
--- a/base/common/python/pki/nss.py
+++ b/base/common/python/pki/nss.py
@@ -32,12 +32,19 @@ CSR_FOOTER = '-----END NEW CERTIFICATE REQUEST-----'
CERT_HEADER = '-----BEGIN CERTIFICATE-----'
CERT_FOOTER = '-----END CERTIFICATE-----'
+PKCS7_HEADER = '-----BEGIN PKCS7-----'
+PKCS7_FOOTER = '-----END PKCS7-----'
+
def convert_data(data, input_format, output_format, header=None, footer=None):
+ if input_format == output_format:
+ return data
+
if input_format == 'base64' and output_format == 'pem':
# split a single line into multiple lines
+ data = data.rstrip('\r\n')
lines = [data[i:i+64] for i in range(0, len(data), 64)]
return '%s\n%s\n%s\n' % (header, '\n'.join(lines), footer)
@@ -65,11 +72,32 @@ def convert_cert(cert_data, input_format, output_format):
return convert_data(cert_data, input_format, output_format, CERT_HEADER, CERT_FOOTER)
+def convert_pkcs7(pkcs7_data, input_format, output_format):
+
+ return convert_data(pkcs7_data, input_format, output_format, PKCS7_HEADER, PKCS7_FOOTER)
+
+def get_file_type(filename):
+
+ with open(filename, 'r') as f:
+ data = f.read()
+
+ if data.startswith(CSR_HEADER):
+ return 'csr'
+
+ if data.startswith(CERT_HEADER):
+ return 'cert'
+
+ if data.startswith(PKCS7_HEADER):
+ return 'pkcs7'
+
+ return None
+
class NSSDatabase(object):
- def __init__(self, directory, password=None, password_file=None):
+ def __init__(self, directory, token='internal', password=None, password_file=None):
self.directory = directory
+ self.token = token
self.tmpdir = tempfile.mkdtemp()
@@ -88,29 +116,38 @@ class NSSDatabase(object):
shutil.rmtree(self.tmpdir)
def add_cert(self,
- nickname, cert_file,
- trust_attributes='u,u,u'):
+ nickname,
+ cert_file,
+ trust_attributes=',,'):
- subprocess.check_call([
+ cmd = [
'certutil',
'-A',
'-d', self.directory,
+ '-h', self.token,
+ '-f', self.password_file,
'-n', nickname,
'-i', cert_file,
'-t', trust_attributes
- ])
+ ]
+
+ subprocess.check_call(cmd)
def modify_cert(self,
nickname,
- trust_attributes='u,u,u'):
+ trust_attributes):
- subprocess.check_call([
+ cmd = [
'certutil',
'-M',
'-d', self.directory,
+ '-h', self.token,
+ '-f', self.password_file,
'-n', nickname,
'-t', trust_attributes
- ])
+ ]
+
+ subprocess.check_call(cmd)
def create_noise(self, noise_file, size=2048):
@@ -123,27 +160,56 @@ class NSSDatabase(object):
def create_request(self,
subject_dn,
- noise_file,
- request_file):
+ request_file,
+ noise_file=None,
+ key_type=None,
+ key_size=None,
+ curve=None,
+ hash_alg=None):
tmpdir = tempfile.mkdtemp()
try:
+ if not noise_file:
+ noise_file = os.path.join(tmpdir, 'noise.bin')
+ if key_size:
+ size = key_size
+ else:
+ size = 2048
+ self.create_noise(
+ noise_file=noise_file,
+ size=size)
+
binary_request_file = os.path.join(tmpdir, 'request.bin')
- b64_request_file = os.path.join(tmpdir, 'request.b64')
- # generate binary request
- subprocess.check_call([
+ cmd = [
'certutil',
'-R',
'-d', self.directory,
+ '-h', self.token,
'-f', self.password_file,
'-s', subject_dn,
- '-z', noise_file,
- '-o', binary_request_file
- ])
+ '-o', binary_request_file,
+ '-z', noise_file
+ ]
+
+ if key_type:
+ cmd.extend(['-k', key_type])
+
+ if key_size:
+ cmd.extend(['-g', str(key_size)])
+
+ if curve:
+ cmd.extend(['-q', curve])
+
+ if hash_alg:
+ cmd.extend(['-Z', hash_alg])
+
+ # generate binary request
+ subprocess.check_call(cmd)
# encode binary request in base-64
+ b64_request_file = os.path.join(tmpdir, 'request.b64')
subprocess.check_call([
'BtoA', binary_request_file, b64_request_file])
@@ -167,11 +233,12 @@ class NSSDatabase(object):
serial='1',
validity=240):
- p = subprocess.Popen([
+ cmd = [
'certutil',
'-C',
'-x',
'-d', self.directory,
+ '-h', self.token,
'-f', self.password_file,
'-c', subject_dn,
'-a',
@@ -184,7 +251,9 @@ class NSSDatabase(object):
'-3',
'--extSKID',
'--extAIA'
- ], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+ ]
+
+ p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
keystroke = ''
@@ -245,7 +314,7 @@ class NSSDatabase(object):
rc = p.wait()
if rc:
- raise Exception('Failed to generate self-signed CA certificate. RC: %d' + rc)
+ raise Exception('Failed to generate self-signed CA certificate. RC: %d' % rc)
def get_cert(self, nickname, output_format='pem'):
@@ -258,13 +327,17 @@ class NSSDatabase(object):
else:
raise Exception('Unsupported output format: %s' % output_format)
- cert_data = subprocess.check_output([
+ cmd = [
'certutil',
'-L',
'-d', self.directory,
+ '-h', self.token,
+ '-f', self.password_file,
'-n', nickname,
output_format_option
- ])
+ ]
+
+ cert_data = subprocess.check_output(cmd)
if output_format == 'base64':
cert_data = base64.b64encode(cert_data)
@@ -273,12 +346,127 @@ class NSSDatabase(object):
def remove_cert(self, nickname):
- subprocess.check_call([
+ cmd = [
'certutil',
'-D',
'-d', self.directory,
+ '-h', self.token,
+ '-f', self.password_file,
'-n', nickname
- ])
+ ]
+
+ subprocess.check_call(cmd)
+
+ def import_cert_chain(self, nickname, cert_chain_file, trust_attributes=None):
+
+ tmpdir = tempfile.mkdtemp()
+
+ try:
+ file_type = get_file_type(cert_chain_file)
+
+ if file_type == 'cert': # import single PEM cert
+ self.add_cert(
+ nickname=nickname,
+ cert_file=cert_chain_file,
+ trust_attributes=trust_attributes)
+ return self.get_cert(
+ nickname=nickname,
+ output_format='base64')
+
+ elif file_type == 'pkcs7': # import PKCS #7 cert chain
+ return self.import_pkcs7(
+ pkcs7_file=cert_chain_file,
+ nickname=nickname,
+ trust_attributes=trust_attributes,
+ output_format='base64')
+
+ else: # import PKCS #7 data without header/footer
+ with open(cert_chain_file, 'r') as f:
+ base64_data = f.read()
+ pkcs7_data = convert_pkcs7(base64_data, 'base64', 'pem')
+
+ tmp_cert_chain_file = os.path.join(tmpdir, 'cert_chain.p7b')
+ with open(tmp_cert_chain_file, 'w') as f:
+ f.write(pkcs7_data)
+
+ self.import_pkcs7(
+ pkcs7_file=tmp_cert_chain_file,
+ nickname=nickname,
+ trust_attributes=trust_attributes)
+
+ return base64_data
+
+ finally:
+ shutil.rmtree(tmpdir)
+
+ def import_pkcs7(self, pkcs7_file, nickname, trust_attributes=None, output_format='pem'):
+
+ tmpdir = tempfile.mkdtemp()
+
+ try:
+ # export certs from PKCS #7 into PEM output
+ output = subprocess.check_output([
+ 'openssl',
+ 'pkcs7',
+ '-print_certs',
+ '-in', pkcs7_file
+ ])
+
+ # parse PEM output into separate PEM certificates
+ certs = []
+ lines = []
+ state = 'header'
+
+ for line in output.splitlines():
+
+ if state == 'header':
+ if line != CERT_HEADER:
+ # ignore header lines
+ pass
+ else:
+ # save cert header
+ lines.append(line)
+ state = 'body'
+
+ elif state == 'body':
+ if line != CERT_FOOTER:
+ # save cert body
+ lines.append(line)
+ else:
+ # save cert footer
+ lines.append(line)
+
+ # construct PEM cert
+ cert = '\n'.join(lines)
+ certs.append(cert)
+ lines = []
+ state = 'header'
+
+ # import PEM certs into NSS database
+ counter = 1
+ for cert in certs:
+
+ cert_file = os.path.join(tmpdir, 'cert%d.pem' % counter)
+ with open(cert_file, 'w') as f:
+ f.write(cert)
+
+ if counter == 1:
+ n = nickname
+ else:
+ n = '%s #%d' % (nickname, counter)
+
+ self.add_cert(n, cert_file, trust_attributes)
+
+ counter += 1
+
+ # convert PKCS #7 data to the requested format
+ with open(pkcs7_file, 'r') as f:
+ data = f.read()
+
+ return convert_pkcs7(data, 'pem', output_format)
+
+ finally:
+ shutil.rmtree(tmpdir)
def import_pkcs12(self, pkcs12_file, pkcs12_password=None, pkcs12_password_file=None):
@@ -296,13 +484,16 @@ class NSSDatabase(object):
else:
raise Exception('Missing PKCS #12 password')
- subprocess.check_call([
+ cmd = [
'pk12util',
'-d', self.directory,
+ '-h', self.token,
'-k', self.password_file,
'-i', pkcs12_file,
'-w', password_file
- ])
+ ]
+
+ subprocess.check_call(cmd)
finally:
shutil.rmtree(tmpdir)
@@ -323,14 +514,16 @@ class NSSDatabase(object):
else:
raise Exception('Missing PKCS #12 password')
- subprocess.check_call([
+ cmd = [
'pk12util',
'-d', self.directory,
'-k', self.password_file,
'-o', pkcs12_file,
'-w', password_file,
'-n', nickname
- ])
+ ]
+
+ subprocess.check_call(cmd)
finally:
shutil.rmtree(tmpdir)