summaryrefslogtreecommitdiffstats
path: root/base/common/python/pki/nssdb.py
diff options
context:
space:
mode:
Diffstat (limited to 'base/common/python/pki/nssdb.py')
-rw-r--r--base/common/python/pki/nssdb.py533
1 files changed, 533 insertions, 0 deletions
diff --git a/base/common/python/pki/nssdb.py b/base/common/python/pki/nssdb.py
new file mode 100644
index 000000000..67fd90b4c
--- /dev/null
+++ b/base/common/python/pki/nssdb.py
@@ -0,0 +1,533 @@
+#!/usr/bin/python
+# Authors:
+# Endi S. Dewata <edewata@redhat.com>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+#
+# Copyright (C) 2015 Red Hat, Inc.
+# All rights reserved.
+#
+
+import base64
+import os
+import shutil
+import subprocess
+import tempfile
+
+
+CSR_HEADER = '-----BEGIN NEW CERTIFICATE REQUEST-----'
+CSR_FOOTER = '-----END NEW CERTIFICATE REQUEST-----'
+
+CERT_HEADER = '-----BEGIN CERTIFICATE-----'
+CERT_FOOTER = '-----END CERTIFICATE-----'
+
+PKCS7_HEADER = '-----BEGIN PKCS7-----'
+PKCS7_FOOTER = '-----END PKCS7-----'
+
+
+def convert_data(data, input_format, output_format, header=None, footer=None):
+
+ if input_format == output_format:
+ return data
+
+ if input_format == 'base64' and output_format == 'pem':
+
+ # join base-64 data into a single line
+ data = data.replace('\r', '').replace('\n', '')
+
+ # re-split the line into fixed-length lines
+ lines = [data[i:i+64] for i in range(0, len(data), 64)]
+
+ # add header and footer
+ return '%s\n%s\n%s\n' % (header, '\n'.join(lines), footer)
+
+ if input_format == 'pem' and output_format == 'base64':
+
+ # join multiple lines into a single line
+ lines = []
+ for line in data.splitlines():
+ line = line.rstrip('\r\n')
+ if line == header:
+ continue
+ if line == footer:
+ continue
+ lines.append(line)
+
+ return ''.join(lines)
+
+ raise Exception('Unable to convert data from %s to %s' % (input_format, output_format))
+
+def convert_csr(csr_data, input_format, output_format):
+
+ return convert_data(csr_data, input_format, output_format, CSR_HEADER, CSR_FOOTER)
+
+def convert_cert(cert_data, input_format, output_format):
+
+ return convert_data(cert_data, input_format, output_format, CERT_HEADER, CERT_FOOTER)
+
+def convert_pkcs7(pkcs7_data, input_format, output_format):
+
+ return convert_data(pkcs7_data, input_format, output_format, PKCS7_HEADER, PKCS7_FOOTER)
+
+def get_file_type(filename):
+
+ with open(filename, 'r') as f:
+ data = f.read()
+
+ if data.startswith(CSR_HEADER):
+ return 'csr'
+
+ if data.startswith(CERT_HEADER):
+ return 'cert'
+
+ if data.startswith(PKCS7_HEADER):
+ return 'pkcs7'
+
+ return None
+
+
+class NSSDatabase(object):
+
+ def __init__(self, directory, token='internal', password=None, password_file=None):
+ self.directory = directory
+ self.token = token
+
+ self.tmpdir = tempfile.mkdtemp()
+
+ if password:
+ self.password_file = os.path.join(self.tmpdir, 'password.txt')
+ with open(self.password_file, 'w') as f:
+ f.write(password)
+
+ elif password_file:
+ self.password_file = password_file
+
+ else:
+ raise Exception('Missing NSS database password')
+
+ def close(self):
+ shutil.rmtree(self.tmpdir)
+
+ def add_cert(self,
+ nickname,
+ cert_file,
+ trust_attributes=',,'):
+
+ cmd = [
+ 'certutil',
+ '-A',
+ '-d', self.directory,
+ '-h', self.token,
+ '-f', self.password_file,
+ '-n', nickname,
+ '-i', cert_file,
+ '-t', trust_attributes
+ ]
+
+ subprocess.check_call(cmd)
+
+ def modify_cert(self,
+ nickname,
+ trust_attributes):
+
+ cmd = [
+ 'certutil',
+ '-M',
+ '-d', self.directory,
+ '-h', self.token,
+ '-f', self.password_file,
+ '-n', nickname,
+ '-t', trust_attributes
+ ]
+
+ subprocess.check_call(cmd)
+
+ def create_noise(self, noise_file, size=2048):
+
+ subprocess.check_call([
+ 'openssl',
+ 'rand',
+ '-out', noise_file,
+ str(size)
+ ])
+
+ def create_request(self,
+ subject_dn,
+ request_file,
+ noise_file=None,
+ key_type=None,
+ key_size=None,
+ curve=None,
+ hash_alg=None):
+
+ tmpdir = tempfile.mkdtemp()
+
+ try:
+ if not noise_file:
+ noise_file = os.path.join(tmpdir, 'noise.bin')
+ if key_size:
+ size = key_size
+ else:
+ size = 2048
+ self.create_noise(
+ noise_file=noise_file,
+ size=size)
+
+ binary_request_file = os.path.join(tmpdir, 'request.bin')
+
+ cmd = [
+ 'certutil',
+ '-R',
+ '-d', self.directory,
+ '-h', self.token,
+ '-f', self.password_file,
+ '-s', subject_dn,
+ '-o', binary_request_file,
+ '-z', noise_file
+ ]
+
+ if key_type:
+ cmd.extend(['-k', key_type])
+
+ if key_size:
+ cmd.extend(['-g', str(key_size)])
+
+ if curve:
+ cmd.extend(['-q', curve])
+
+ if hash_alg:
+ cmd.extend(['-Z', hash_alg])
+
+ # generate binary request
+ subprocess.check_call(cmd)
+
+ # encode binary request in base-64
+ b64_request_file = os.path.join(tmpdir, 'request.b64')
+ subprocess.check_call([
+ 'BtoA', binary_request_file, b64_request_file])
+
+ # read base-64 request
+ with open(b64_request_file, 'r') as f:
+ b64_request = f.read()
+
+ # add header and footer
+ with open(request_file, 'w') as f:
+ f.write('-----BEGIN NEW CERTIFICATE REQUEST-----\n')
+ f.write(b64_request)
+ f.write('-----END NEW CERTIFICATE REQUEST-----\n')
+
+ finally:
+ shutil.rmtree(tmpdir)
+
+ def create_self_signed_ca_cert(self,
+ subject_dn,
+ request_file,
+ cert_file,
+ serial='1',
+ validity=240):
+
+ cmd = [
+ 'certutil',
+ '-C',
+ '-x',
+ '-d', self.directory,
+ '-h', self.token,
+ '-f', self.password_file,
+ '-c', subject_dn,
+ '-a',
+ '-i', request_file,
+ '-o', cert_file,
+ '-m', serial,
+ '-v', str(validity),
+ '--keyUsage', 'digitalSignature,nonRepudiation,certSigning,crlSigning,critical',
+ '-2',
+ '-3',
+ '--extSKID',
+ '--extAIA'
+ ]
+
+ p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+
+ keystroke = ''
+
+ # Is this a CA certificate [y/N]?
+ keystroke += 'y\n'
+
+ # Enter the path length constraint, enter to skip [<0 for unlimited path]:
+ keystroke += '\n'
+
+ # Is this a critical extension [y/N]?
+ keystroke += 'y\n'
+
+ # Enter value for the authKeyID extension [y/N]?
+ keystroke += 'y\n'
+
+ # TODO: generate SHA1 ID (see APolicyRule.formSHA1KeyId())
+ # Enter value for the key identifier fields,enter to omit:
+ keystroke += '2d:7e:83:37:75:5a:fd:0e:8d:52:a3:70:16:93:36:b8:4a:d6:84:9f\n'
+
+ # Select one of the following general name type:
+ keystroke += '0\n'
+
+ # Enter value for the authCertSerial field, enter to omit:
+ keystroke += '\n'
+
+ # Is this a critical extension [y/N]?
+ keystroke += '\n'
+
+ # TODO: generate SHA1 ID (see APolicyRule.formSHA1KeyId())
+ # Adding Subject Key ID extension.
+ # Enter value for the key identifier fields,enter to omit:
+ keystroke += '2d:7e:83:37:75:5a:fd:0e:8d:52:a3:70:16:93:36:b8:4a:d6:84:9f\n'
+
+ # Is this a critical extension [y/N]?
+ keystroke += '\n'
+
+ # Enter access method type for Authority Information Access extension:
+ keystroke += '2\n'
+
+ # Select one of the following general name type:
+ keystroke += '7\n'
+
+ # TODO: replace with actual hostname name and port number
+ # Enter data:
+ keystroke += 'http://server.example.com:8080/ca/ocsp\n'
+
+ # Select one of the following general name type:
+ keystroke += '0\n'
+
+ # Add another location to the Authority Information Access extension [y/N]
+ keystroke += '\n'
+
+ # Is this a critical extension [y/N]?
+ keystroke += '\n'
+
+ p.communicate(keystroke)
+
+ rc = p.wait()
+
+ if rc:
+ raise Exception('Failed to generate self-signed CA certificate. RC: %d' % rc)
+
+ def get_cert(self, nickname, output_format='pem'):
+
+ if output_format == 'pem':
+ output_format_option = '-a'
+
+ elif output_format == 'base64':
+ output_format_option = '-r'
+
+ else:
+ raise Exception('Unsupported output format: %s' % output_format)
+
+ cmd = [
+ 'certutil',
+ '-L',
+ '-d', self.directory,
+ '-h', self.token,
+ '-f', self.password_file,
+ '-n', nickname,
+ output_format_option
+ ]
+
+ cert_data = subprocess.check_output(cmd)
+
+ if output_format == 'base64':
+ cert_data = base64.b64encode(cert_data)
+
+ return cert_data
+
+ def remove_cert(self, nickname):
+
+ cmd = [
+ 'certutil',
+ '-D',
+ '-d', self.directory,
+ '-h', self.token,
+ '-f', self.password_file,
+ '-n', nickname
+ ]
+
+ subprocess.check_call(cmd)
+
+ def import_cert_chain(self, nickname, cert_chain_file, trust_attributes=None):
+
+ tmpdir = tempfile.mkdtemp()
+
+ try:
+ file_type = get_file_type(cert_chain_file)
+
+ if file_type == 'cert': # import single PEM cert
+ self.add_cert(
+ nickname=nickname,
+ cert_file=cert_chain_file,
+ trust_attributes=trust_attributes)
+ return self.get_cert(
+ nickname=nickname,
+ output_format='base64')
+
+ elif file_type == 'pkcs7': # import PKCS #7 cert chain
+ return self.import_pkcs7(
+ pkcs7_file=cert_chain_file,
+ nickname=nickname,
+ trust_attributes=trust_attributes,
+ output_format='base64')
+
+ else: # import PKCS #7 data without header/footer
+ with open(cert_chain_file, 'r') as f:
+ base64_data = f.read()
+ pkcs7_data = convert_pkcs7(base64_data, 'base64', 'pem')
+
+ tmp_cert_chain_file = os.path.join(tmpdir, 'cert_chain.p7b')
+ with open(tmp_cert_chain_file, 'w') as f:
+ f.write(pkcs7_data)
+
+ self.import_pkcs7(
+ pkcs7_file=tmp_cert_chain_file,
+ nickname=nickname,
+ trust_attributes=trust_attributes)
+
+ return base64_data
+
+ finally:
+ shutil.rmtree(tmpdir)
+
+ def import_pkcs7(self, pkcs7_file, nickname, trust_attributes=None, output_format='pem'):
+
+ tmpdir = tempfile.mkdtemp()
+
+ try:
+ # export certs from PKCS #7 into PEM output
+ output = subprocess.check_output([
+ 'openssl',
+ 'pkcs7',
+ '-print_certs',
+ '-in', pkcs7_file
+ ])
+
+ # parse PEM output into separate PEM certificates
+ certs = []
+ lines = []
+ state = 'header'
+
+ for line in output.splitlines():
+
+ if state == 'header':
+ if line != CERT_HEADER:
+ # ignore header lines
+ pass
+ else:
+ # save cert header
+ lines.append(line)
+ state = 'body'
+
+ elif state == 'body':
+ if line != CERT_FOOTER:
+ # save cert body
+ lines.append(line)
+ else:
+ # save cert footer
+ lines.append(line)
+
+ # construct PEM cert
+ cert = '\n'.join(lines)
+ certs.append(cert)
+ lines = []
+ state = 'header'
+
+ # import PEM certs into NSS database
+ counter = 1
+ for cert in certs:
+
+ cert_file = os.path.join(tmpdir, 'cert%d.pem' % counter)
+ with open(cert_file, 'w') as f:
+ f.write(cert)
+
+ if counter == 1:
+ n = nickname
+ else:
+ n = '%s #%d' % (nickname, counter)
+
+ self.add_cert(n, cert_file, trust_attributes)
+
+ counter += 1
+
+ # convert PKCS #7 data to the requested format
+ with open(pkcs7_file, 'r') as f:
+ data = f.read()
+
+ return convert_pkcs7(data, 'pem', output_format)
+
+ finally:
+ shutil.rmtree(tmpdir)
+
+ def import_pkcs12(self, pkcs12_file, pkcs12_password=None, pkcs12_password_file=None):
+
+ tmpdir = tempfile.mkdtemp()
+
+ try:
+ if pkcs12_password:
+ password_file = os.path.join(tmpdir, 'password.txt')
+ with open(password_file, 'w') as f:
+ f.write(pkcs12_password)
+
+ elif pkcs12_password_file:
+ password_file = pkcs12_password_file
+
+ else:
+ raise Exception('Missing PKCS #12 password')
+
+ cmd = [
+ 'pk12util',
+ '-d', self.directory,
+ '-h', self.token,
+ '-k', self.password_file,
+ '-i', pkcs12_file,
+ '-w', password_file
+ ]
+
+ subprocess.check_call(cmd)
+
+ finally:
+ shutil.rmtree(tmpdir)
+
+ def export_pkcs12(self, pkcs12_file, nickname, pkcs12_password=None, pkcs12_password_file=None):
+
+ tmpdir = tempfile.mkdtemp()
+
+ try:
+ if pkcs12_password:
+ password_file = os.path.join(tmpdir, 'password.txt')
+ with open(password_file, 'w') as f:
+ f.write(pkcs12_password)
+
+ elif pkcs12_password_file:
+ password_file = pkcs12_password_file
+
+ else:
+ raise Exception('Missing PKCS #12 password')
+
+ cmd = [
+ 'pk12util',
+ '-d', self.directory,
+ '-k', self.password_file,
+ '-o', pkcs12_file,
+ '-w', password_file,
+ '-n', nickname
+ ]
+
+ subprocess.check_call(cmd)
+
+ finally:
+ shutil.rmtree(tmpdir)