summaryrefslogtreecommitdiffstats
path: root/base/common/python/pki/nss.py
diff options
context:
space:
mode:
Diffstat (limited to 'base/common/python/pki/nss.py')
-rw-r--r--base/common/python/pki/nss.py336
1 files changed, 336 insertions, 0 deletions
diff --git a/base/common/python/pki/nss.py b/base/common/python/pki/nss.py
new file mode 100644
index 000000000..f36b771f8
--- /dev/null
+++ b/base/common/python/pki/nss.py
@@ -0,0 +1,336 @@
+#!/usr/bin/python
+# Authors:
+# Endi S. Dewata <edewata@redhat.com>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+#
+# Copyright (C) 2015 Red Hat, Inc.
+# All rights reserved.
+#
+
+import base64
+import os
+import shutil
+import subprocess
+import tempfile
+
+
+CSR_HEADER = '-----BEGIN NEW CERTIFICATE REQUEST-----'
+CSR_FOOTER = '-----END NEW CERTIFICATE REQUEST-----'
+
+CERT_HEADER = '-----BEGIN CERTIFICATE-----'
+CERT_FOOTER = '-----END CERTIFICATE-----'
+
+
+def convert_data(data, input_format, output_format, header=None, footer=None):
+
+ if input_format == 'base64' and output_format == 'pem':
+
+ # split a single line into multiple lines
+ lines = [data[i:i+64] for i in range(0, len(data), 64)]
+ return '%s\n%s\n%s\n' % (header, '\n'.join(lines), footer)
+
+ if input_format == 'pem' and output_format == 'base64':
+
+ # join multiple lines into a single line
+ lines = []
+ for line in data.splitlines():
+ line = line.rstrip('\r\n')
+ if line == header:
+ continue
+ if line == footer:
+ continue
+ lines.append(line)
+
+ return ''.join(lines)
+
+ raise Exception('Unable to convert data from %s to %s' % (input_format, output_format))
+
+def convert_csr(csr_data, input_format, output_format):
+
+ return convert_data(csr_data, input_format, output_format, CSR_HEADER, CSR_FOOTER)
+
+def convert_cert(cert_data, input_format, output_format):
+
+ return convert_data(cert_data, input_format, output_format, CERT_HEADER, CERT_FOOTER)
+
+
+class NSSDatabase(object):
+
+ def __init__(self, directory, password=None, password_file=None):
+ self.directory = directory
+
+ self.tmpdir = tempfile.mkdtemp()
+
+ if password:
+ self.password_file = os.path.join(self.tmpdir, 'password.txt')
+ with open(self.password_file, 'w') as f:
+ f.write(password)
+
+ elif password_file:
+ self.password_file = password_file
+
+ else:
+ raise Exception('Missing NSS database password')
+
+ def close(self):
+ shutil.rmtree(self.tmpdir)
+
+ def add_cert(self,
+ nickname, cert_file,
+ trust_attributes='u,u,u'):
+
+ subprocess.check_call([
+ 'certutil',
+ '-A',
+ '-d', self.directory,
+ '-n', nickname,
+ '-i', cert_file,
+ '-t', trust_attributes
+ ])
+
+ def modify_cert(self,
+ nickname,
+ trust_attributes='u,u,u'):
+
+ subprocess.check_call([
+ 'certutil',
+ '-M',
+ '-d', self.directory,
+ '-n', nickname,
+ '-t', trust_attributes
+ ])
+
+ def create_noise(self, noise_file, size=2048):
+
+ subprocess.check_call([
+ 'openssl',
+ 'rand',
+ '-out', noise_file,
+ str(size)
+ ])
+
+ def create_request(self,
+ subject_dn,
+ noise_file,
+ request_file):
+
+ tmpdir = tempfile.mkdtemp()
+
+ try:
+ binary_request_file = os.path.join(tmpdir, 'request.bin')
+ b64_request_file = os.path.join(tmpdir, 'request.b64')
+
+ # generate binary request
+ subprocess.check_call([
+ 'certutil',
+ '-R',
+ '-d', self.directory,
+ '-f', self.password_file,
+ '-s', subject_dn,
+ '-z', noise_file,
+ '-o', binary_request_file
+ ])
+
+ # encode binary request in base-64
+ subprocess.check_call([
+ 'BtoA', binary_request_file, b64_request_file])
+
+ # read base-64 request
+ with open(b64_request_file, 'r') as f:
+ b64_request = f.read()
+
+ # add header and footer
+ with open(request_file, 'w') as f:
+ f.write('-----BEGIN NEW CERTIFICATE REQUEST-----\n')
+ f.write(b64_request)
+ f.write('-----END NEW CERTIFICATE REQUEST-----\n')
+
+ finally:
+ shutil.rmtree(tmpdir)
+
+ def create_self_signed_ca_cert(self,
+ subject_dn,
+ request_file,
+ cert_file,
+ serial='1',
+ validity=240):
+
+ p = subprocess.Popen([
+ 'certutil',
+ '-C',
+ '-x',
+ '-d', self.directory,
+ '-f', self.password_file,
+ '-c', subject_dn,
+ '-a',
+ '-i', request_file,
+ '-o', cert_file,
+ '-m', serial,
+ '-v', str(validity),
+ '--keyUsage', 'digitalSignature,nonRepudiation,certSigning,crlSigning,critical',
+ '-2',
+ '-3',
+ '--extSKID',
+ '--extAIA'
+ ], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+
+ keystroke = ''
+
+ # Is this a CA certificate [y/N]?
+ keystroke += 'y\n'
+
+ # Enter the path length constraint, enter to skip [<0 for unlimited path]:
+ keystroke += '\n'
+
+ # Is this a critical extension [y/N]?
+ keystroke += 'y\n'
+
+ # Enter value for the authKeyID extension [y/N]?
+ keystroke += 'y\n'
+
+ # TODO: generate SHA1 ID (see APolicyRule.formSHA1KeyId())
+ # Enter value for the key identifier fields,enter to omit:
+ keystroke += '2d:7e:83:37:75:5a:fd:0e:8d:52:a3:70:16:93:36:b8:4a:d6:84:9f\n'
+
+ # Select one of the following general name type:
+ keystroke += '0\n'
+
+ # Enter value for the authCertSerial field, enter to omit:
+ keystroke += '\n'
+
+ # Is this a critical extension [y/N]?
+ keystroke += '\n'
+
+ # TODO: generate SHA1 ID (see APolicyRule.formSHA1KeyId())
+ # Adding Subject Key ID extension.
+ # Enter value for the key identifier fields,enter to omit:
+ keystroke += '2d:7e:83:37:75:5a:fd:0e:8d:52:a3:70:16:93:36:b8:4a:d6:84:9f\n'
+
+ # Is this a critical extension [y/N]?
+ keystroke += '\n'
+
+ # Enter access method type for Authority Information Access extension:
+ keystroke += '2\n'
+
+ # Select one of the following general name type:
+ keystroke += '7\n'
+
+ # TODO: replace with actual hostname name and port number
+ # Enter data:
+ keystroke += 'http://server.example.com:8080/ca/ocsp\n'
+
+ # Select one of the following general name type:
+ keystroke += '0\n'
+
+ # Add another location to the Authority Information Access extension [y/N]
+ keystroke += '\n'
+
+ # Is this a critical extension [y/N]?
+ keystroke += '\n'
+
+ p.communicate(keystroke)
+
+ rc = p.wait()
+
+ if rc:
+ raise Exception('Failed to generate self-signed CA certificate. RC: %d' + rc)
+
+ def get_cert(self, nickname, output_format='pem'):
+
+ if output_format == 'pem':
+ output_format_option = '-a'
+
+ elif output_format == 'base64':
+ output_format_option = '-r'
+
+ else:
+ raise Exception('Unsupported output format: %s' % output_format)
+
+ cert_data = subprocess.check_output([
+ 'certutil',
+ '-L',
+ '-d', self.directory,
+ '-n', nickname,
+ output_format_option
+ ])
+
+ if output_format == 'base64':
+ cert_data = base64.b64encode(cert_data)
+
+ return cert_data
+
+ def remove_cert(self, nickname):
+
+ subprocess.check_call([
+ 'certutil',
+ '-D',
+ '-d', self.directory,
+ '-n', nickname
+ ])
+
+ def import_pkcs12(self, pkcs12_file, pkcs12_password=None, pkcs12_password_file=None):
+
+ tmpdir = tempfile.mkdtemp()
+
+ try:
+ if pkcs12_password:
+ password_file = os.path.join(tmpdir, 'password.txt')
+ with open(password_file, 'w') as f:
+ f.write(pkcs12_password)
+
+ elif pkcs12_password_file:
+ password_file = pkcs12_password_file
+
+ else:
+ raise Exception('Missing PKCS #12 password')
+
+ subprocess.check_call([
+ 'pk12util',
+ '-d', self.directory,
+ '-k', self.password_file,
+ '-i', pkcs12_file,
+ '-w', password_file
+ ])
+
+ finally:
+ shutil.rmtree(tmpdir)
+
+ def export_pkcs12(self, pkcs12_file, nickname, pkcs12_password=None, pkcs12_password_file=None):
+
+ tmpdir = tempfile.mkdtemp()
+
+ try:
+ if pkcs12_password:
+ password_file = os.path.join(tmpdir, 'password.txt')
+ with open(password_file, 'w') as f:
+ f.write(pkcs12_password)
+
+ elif pkcs12_password_file:
+ password_file = pkcs12_password_file
+
+ else:
+ raise Exception('Missing PKCS #12 password')
+
+ subprocess.check_call([
+ 'pk12util',
+ '-d', self.directory,
+ '-k', self.password_file,
+ '-o', pkcs12_file,
+ '-w', password_file,
+ '-n', nickname
+ ])
+
+ finally:
+ shutil.rmtree(tmpdir)