From 9609f4e6035d3cdff19a0f78caee2d08b095c8ba Mon Sep 17 00:00:00 2001 From: "Endi S. Dewata" Date: Fri, 22 Jan 2016 17:34:19 +0100 Subject: Renamed pki.nss into pki.nssdb. The pki.nss module has been renamed into pki.nssdb to prevent conflicts with the nss module. https://fedorahosted.org/pki/ticket/456 --- base/common/python/pki/nss.py | 532 ---------------------------------------- base/common/python/pki/nssdb.py | 532 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 532 insertions(+), 532 deletions(-) delete mode 100644 base/common/python/pki/nss.py create mode 100644 base/common/python/pki/nssdb.py (limited to 'base/common/python') diff --git a/base/common/python/pki/nss.py b/base/common/python/pki/nss.py deleted file mode 100644 index 44e286853..000000000 --- a/base/common/python/pki/nss.py +++ /dev/null @@ -1,532 +0,0 @@ -# Authors: -# Endi S. Dewata -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; version 2 of the License. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -# -# Copyright (C) 2015 Red Hat, Inc. -# All rights reserved. -# - -import base64 -import os -import shutil -import subprocess -import tempfile - - -CSR_HEADER = '-----BEGIN NEW CERTIFICATE REQUEST-----' -CSR_FOOTER = '-----END NEW CERTIFICATE REQUEST-----' - -CERT_HEADER = '-----BEGIN CERTIFICATE-----' -CERT_FOOTER = '-----END CERTIFICATE-----' - -PKCS7_HEADER = '-----BEGIN PKCS7-----' -PKCS7_FOOTER = '-----END PKCS7-----' - - -def convert_data(data, input_format, output_format, header=None, footer=None): - - if input_format == output_format: - return data - - if input_format == 'base64' and output_format == 'pem': - - # join base-64 data into a single line - data = data.replace('\r', '').replace('\n', '') - - # re-split the line into fixed-length lines - lines = [data[i:i+64] for i in range(0, len(data), 64)] - - # add header and footer - return '%s\n%s\n%s\n' % (header, '\n'.join(lines), footer) - - if input_format == 'pem' and output_format == 'base64': - - # join multiple lines into a single line - lines = [] - for line in data.splitlines(): - line = line.rstrip('\r\n') - if line == header: - continue - if line == footer: - continue - lines.append(line) - - return ''.join(lines) - - raise Exception('Unable to convert data from %s to %s' % (input_format, output_format)) - -def convert_csr(csr_data, input_format, output_format): - - return convert_data(csr_data, input_format, output_format, CSR_HEADER, CSR_FOOTER) - -def convert_cert(cert_data, input_format, output_format): - - return convert_data(cert_data, input_format, output_format, CERT_HEADER, CERT_FOOTER) - -def convert_pkcs7(pkcs7_data, input_format, output_format): - - return convert_data(pkcs7_data, input_format, output_format, PKCS7_HEADER, PKCS7_FOOTER) - -def get_file_type(filename): - - with open(filename, 'r') as f: - data = f.read() - - if data.startswith(CSR_HEADER): - return 'csr' - - if data.startswith(CERT_HEADER): - return 'cert' - - if data.startswith(PKCS7_HEADER): - return 'pkcs7' - - return None - - -class NSSDatabase(object): - - def __init__(self, directory, token='internal', password=None, password_file=None): - self.directory = directory - self.token = token - - self.tmpdir = tempfile.mkdtemp() - - if password: - self.password_file = os.path.join(self.tmpdir, 'password.txt') - with open(self.password_file, 'w') as f: - f.write(password) - - elif password_file: - self.password_file = password_file - - else: - raise Exception('Missing NSS database password') - - def close(self): - shutil.rmtree(self.tmpdir) - - def add_cert(self, - nickname, - cert_file, - trust_attributes=',,'): - - cmd = [ - 'certutil', - '-A', - '-d', self.directory, - '-h', self.token, - '-f', self.password_file, - '-n', nickname, - '-i', cert_file, - '-t', trust_attributes - ] - - subprocess.check_call(cmd) - - def modify_cert(self, - nickname, - trust_attributes): - - cmd = [ - 'certutil', - '-M', - '-d', self.directory, - '-h', self.token, - '-f', self.password_file, - '-n', nickname, - '-t', trust_attributes - ] - - subprocess.check_call(cmd) - - def create_noise(self, noise_file, size=2048): - - subprocess.check_call([ - 'openssl', - 'rand', - '-out', noise_file, - str(size) - ]) - - def create_request(self, - subject_dn, - request_file, - noise_file=None, - key_type=None, - key_size=None, - curve=None, - hash_alg=None): - - tmpdir = tempfile.mkdtemp() - - try: - if not noise_file: - noise_file = os.path.join(tmpdir, 'noise.bin') - if key_size: - size = key_size - else: - size = 2048 - self.create_noise( - noise_file=noise_file, - size=size) - - binary_request_file = os.path.join(tmpdir, 'request.bin') - - cmd = [ - 'certutil', - '-R', - '-d', self.directory, - '-h', self.token, - '-f', self.password_file, - '-s', subject_dn, - '-o', binary_request_file, - '-z', noise_file - ] - - if key_type: - cmd.extend(['-k', key_type]) - - if key_size: - cmd.extend(['-g', str(key_size)]) - - if curve: - cmd.extend(['-q', curve]) - - if hash_alg: - cmd.extend(['-Z', hash_alg]) - - # generate binary request - subprocess.check_call(cmd) - - # encode binary request in base-64 - b64_request_file = os.path.join(tmpdir, 'request.b64') - subprocess.check_call([ - 'BtoA', binary_request_file, b64_request_file]) - - # read base-64 request - with open(b64_request_file, 'r') as f: - b64_request = f.read() - - # add header and footer - with open(request_file, 'w') as f: - f.write('-----BEGIN NEW CERTIFICATE REQUEST-----\n') - f.write(b64_request) - f.write('-----END NEW CERTIFICATE REQUEST-----\n') - - finally: - shutil.rmtree(tmpdir) - - def create_self_signed_ca_cert(self, - subject_dn, - request_file, - cert_file, - serial='1', - validity=240): - - cmd = [ - 'certutil', - '-C', - '-x', - '-d', self.directory, - '-h', self.token, - '-f', self.password_file, - '-c', subject_dn, - '-a', - '-i', request_file, - '-o', cert_file, - '-m', serial, - '-v', str(validity), - '--keyUsage', 'digitalSignature,nonRepudiation,certSigning,crlSigning,critical', - '-2', - '-3', - '--extSKID', - '--extAIA' - ] - - p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - - keystroke = '' - - # Is this a CA certificate [y/N]? - keystroke += 'y\n' - - # Enter the path length constraint, enter to skip [<0 for unlimited path]: - keystroke += '\n' - - # Is this a critical extension [y/N]? - keystroke += 'y\n' - - # Enter value for the authKeyID extension [y/N]? - keystroke += 'y\n' - - # TODO: generate SHA1 ID (see APolicyRule.formSHA1KeyId()) - # Enter value for the key identifier fields,enter to omit: - keystroke += '2d:7e:83:37:75:5a:fd:0e:8d:52:a3:70:16:93:36:b8:4a:d6:84:9f\n' - - # Select one of the following general name type: - keystroke += '0\n' - - # Enter value for the authCertSerial field, enter to omit: - keystroke += '\n' - - # Is this a critical extension [y/N]? - keystroke += '\n' - - # TODO: generate SHA1 ID (see APolicyRule.formSHA1KeyId()) - # Adding Subject Key ID extension. - # Enter value for the key identifier fields,enter to omit: - keystroke += '2d:7e:83:37:75:5a:fd:0e:8d:52:a3:70:16:93:36:b8:4a:d6:84:9f\n' - - # Is this a critical extension [y/N]? - keystroke += '\n' - - # Enter access method type for Authority Information Access extension: - keystroke += '2\n' - - # Select one of the following general name type: - keystroke += '7\n' - - # TODO: replace with actual hostname name and port number - # Enter data: - keystroke += 'http://server.example.com:8080/ca/ocsp\n' - - # Select one of the following general name type: - keystroke += '0\n' - - # Add another location to the Authority Information Access extension [y/N] - keystroke += '\n' - - # Is this a critical extension [y/N]? - keystroke += '\n' - - p.communicate(keystroke) - - rc = p.wait() - - if rc: - raise Exception('Failed to generate self-signed CA certificate. RC: %d' % rc) - - def get_cert(self, nickname, output_format='pem'): - - if output_format == 'pem': - output_format_option = '-a' - - elif output_format == 'base64': - output_format_option = '-r' - - else: - raise Exception('Unsupported output format: %s' % output_format) - - cmd = [ - 'certutil', - '-L', - '-d', self.directory, - '-h', self.token, - '-f', self.password_file, - '-n', nickname, - output_format_option - ] - - cert_data = subprocess.check_output(cmd) - - if output_format == 'base64': - cert_data = base64.b64encode(cert_data) - - return cert_data - - def remove_cert(self, nickname): - - cmd = [ - 'certutil', - '-D', - '-d', self.directory, - '-h', self.token, - '-f', self.password_file, - '-n', nickname - ] - - subprocess.check_call(cmd) - - def import_cert_chain(self, nickname, cert_chain_file, trust_attributes=None): - - tmpdir = tempfile.mkdtemp() - - try: - file_type = get_file_type(cert_chain_file) - - if file_type == 'cert': # import single PEM cert - self.add_cert( - nickname=nickname, - cert_file=cert_chain_file, - trust_attributes=trust_attributes) - return self.get_cert( - nickname=nickname, - output_format='base64') - - elif file_type == 'pkcs7': # import PKCS #7 cert chain - return self.import_pkcs7( - pkcs7_file=cert_chain_file, - nickname=nickname, - trust_attributes=trust_attributes, - output_format='base64') - - else: # import PKCS #7 data without header/footer - with open(cert_chain_file, 'r') as f: - base64_data = f.read() - pkcs7_data = convert_pkcs7(base64_data, 'base64', 'pem') - - tmp_cert_chain_file = os.path.join(tmpdir, 'cert_chain.p7b') - with open(tmp_cert_chain_file, 'w') as f: - f.write(pkcs7_data) - - self.import_pkcs7( - pkcs7_file=tmp_cert_chain_file, - nickname=nickname, - trust_attributes=trust_attributes) - - return base64_data - - finally: - shutil.rmtree(tmpdir) - - def import_pkcs7(self, pkcs7_file, nickname, trust_attributes=None, output_format='pem'): - - tmpdir = tempfile.mkdtemp() - - try: - # export certs from PKCS #7 into PEM output - output = subprocess.check_output([ - 'openssl', - 'pkcs7', - '-print_certs', - '-in', pkcs7_file - ]) - - # parse PEM output into separate PEM certificates - certs = [] - lines = [] - state = 'header' - - for line in output.splitlines(): - - if state == 'header': - if line != CERT_HEADER: - # ignore header lines - pass - else: - # save cert header - lines.append(line) - state = 'body' - - elif state == 'body': - if line != CERT_FOOTER: - # save cert body - lines.append(line) - else: - # save cert footer - lines.append(line) - - # construct PEM cert - cert = '\n'.join(lines) - certs.append(cert) - lines = [] - state = 'header' - - # import PEM certs into NSS database - counter = 1 - for cert in certs: - - cert_file = os.path.join(tmpdir, 'cert%d.pem' % counter) - with open(cert_file, 'w') as f: - f.write(cert) - - if counter == 1: - n = nickname - else: - n = '%s #%d' % (nickname, counter) - - self.add_cert(n, cert_file, trust_attributes) - - counter += 1 - - # convert PKCS #7 data to the requested format - with open(pkcs7_file, 'r') as f: - data = f.read() - - return convert_pkcs7(data, 'pem', output_format) - - finally: - shutil.rmtree(tmpdir) - - def import_pkcs12(self, pkcs12_file, pkcs12_password=None, pkcs12_password_file=None): - - tmpdir = tempfile.mkdtemp() - - try: - if pkcs12_password: - password_file = os.path.join(tmpdir, 'password.txt') - with open(password_file, 'w') as f: - f.write(pkcs12_password) - - elif pkcs12_password_file: - password_file = pkcs12_password_file - - else: - raise Exception('Missing PKCS #12 password') - - cmd = [ - 'pk12util', - '-d', self.directory, - '-h', self.token, - '-k', self.password_file, - '-i', pkcs12_file, - '-w', password_file - ] - - subprocess.check_call(cmd) - - finally: - shutil.rmtree(tmpdir) - - def export_pkcs12(self, pkcs12_file, nickname, pkcs12_password=None, pkcs12_password_file=None): - - tmpdir = tempfile.mkdtemp() - - try: - if pkcs12_password: - password_file = os.path.join(tmpdir, 'password.txt') - with open(password_file, 'w') as f: - f.write(pkcs12_password) - - elif pkcs12_password_file: - password_file = pkcs12_password_file - - else: - raise Exception('Missing PKCS #12 password') - - cmd = [ - 'pk12util', - '-d', self.directory, - '-k', self.password_file, - '-o', pkcs12_file, - '-w', password_file, - '-n', nickname - ] - - subprocess.check_call(cmd) - - finally: - shutil.rmtree(tmpdir) diff --git a/base/common/python/pki/nssdb.py b/base/common/python/pki/nssdb.py new file mode 100644 index 000000000..44e286853 --- /dev/null +++ b/base/common/python/pki/nssdb.py @@ -0,0 +1,532 @@ +# Authors: +# Endi S. Dewata +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# +# Copyright (C) 2015 Red Hat, Inc. +# All rights reserved. +# + +import base64 +import os +import shutil +import subprocess +import tempfile + + +CSR_HEADER = '-----BEGIN NEW CERTIFICATE REQUEST-----' +CSR_FOOTER = '-----END NEW CERTIFICATE REQUEST-----' + +CERT_HEADER = '-----BEGIN CERTIFICATE-----' +CERT_FOOTER = '-----END CERTIFICATE-----' + +PKCS7_HEADER = '-----BEGIN PKCS7-----' +PKCS7_FOOTER = '-----END PKCS7-----' + + +def convert_data(data, input_format, output_format, header=None, footer=None): + + if input_format == output_format: + return data + + if input_format == 'base64' and output_format == 'pem': + + # join base-64 data into a single line + data = data.replace('\r', '').replace('\n', '') + + # re-split the line into fixed-length lines + lines = [data[i:i+64] for i in range(0, len(data), 64)] + + # add header and footer + return '%s\n%s\n%s\n' % (header, '\n'.join(lines), footer) + + if input_format == 'pem' and output_format == 'base64': + + # join multiple lines into a single line + lines = [] + for line in data.splitlines(): + line = line.rstrip('\r\n') + if line == header: + continue + if line == footer: + continue + lines.append(line) + + return ''.join(lines) + + raise Exception('Unable to convert data from %s to %s' % (input_format, output_format)) + +def convert_csr(csr_data, input_format, output_format): + + return convert_data(csr_data, input_format, output_format, CSR_HEADER, CSR_FOOTER) + +def convert_cert(cert_data, input_format, output_format): + + return convert_data(cert_data, input_format, output_format, CERT_HEADER, CERT_FOOTER) + +def convert_pkcs7(pkcs7_data, input_format, output_format): + + return convert_data(pkcs7_data, input_format, output_format, PKCS7_HEADER, PKCS7_FOOTER) + +def get_file_type(filename): + + with open(filename, 'r') as f: + data = f.read() + + if data.startswith(CSR_HEADER): + return 'csr' + + if data.startswith(CERT_HEADER): + return 'cert' + + if data.startswith(PKCS7_HEADER): + return 'pkcs7' + + return None + + +class NSSDatabase(object): + + def __init__(self, directory, token='internal', password=None, password_file=None): + self.directory = directory + self.token = token + + self.tmpdir = tempfile.mkdtemp() + + if password: + self.password_file = os.path.join(self.tmpdir, 'password.txt') + with open(self.password_file, 'w') as f: + f.write(password) + + elif password_file: + self.password_file = password_file + + else: + raise Exception('Missing NSS database password') + + def close(self): + shutil.rmtree(self.tmpdir) + + def add_cert(self, + nickname, + cert_file, + trust_attributes=',,'): + + cmd = [ + 'certutil', + '-A', + '-d', self.directory, + '-h', self.token, + '-f', self.password_file, + '-n', nickname, + '-i', cert_file, + '-t', trust_attributes + ] + + subprocess.check_call(cmd) + + def modify_cert(self, + nickname, + trust_attributes): + + cmd = [ + 'certutil', + '-M', + '-d', self.directory, + '-h', self.token, + '-f', self.password_file, + '-n', nickname, + '-t', trust_attributes + ] + + subprocess.check_call(cmd) + + def create_noise(self, noise_file, size=2048): + + subprocess.check_call([ + 'openssl', + 'rand', + '-out', noise_file, + str(size) + ]) + + def create_request(self, + subject_dn, + request_file, + noise_file=None, + key_type=None, + key_size=None, + curve=None, + hash_alg=None): + + tmpdir = tempfile.mkdtemp() + + try: + if not noise_file: + noise_file = os.path.join(tmpdir, 'noise.bin') + if key_size: + size = key_size + else: + size = 2048 + self.create_noise( + noise_file=noise_file, + size=size) + + binary_request_file = os.path.join(tmpdir, 'request.bin') + + cmd = [ + 'certutil', + '-R', + '-d', self.directory, + '-h', self.token, + '-f', self.password_file, + '-s', subject_dn, + '-o', binary_request_file, + '-z', noise_file + ] + + if key_type: + cmd.extend(['-k', key_type]) + + if key_size: + cmd.extend(['-g', str(key_size)]) + + if curve: + cmd.extend(['-q', curve]) + + if hash_alg: + cmd.extend(['-Z', hash_alg]) + + # generate binary request + subprocess.check_call(cmd) + + # encode binary request in base-64 + b64_request_file = os.path.join(tmpdir, 'request.b64') + subprocess.check_call([ + 'BtoA', binary_request_file, b64_request_file]) + + # read base-64 request + with open(b64_request_file, 'r') as f: + b64_request = f.read() + + # add header and footer + with open(request_file, 'w') as f: + f.write('-----BEGIN NEW CERTIFICATE REQUEST-----\n') + f.write(b64_request) + f.write('-----END NEW CERTIFICATE REQUEST-----\n') + + finally: + shutil.rmtree(tmpdir) + + def create_self_signed_ca_cert(self, + subject_dn, + request_file, + cert_file, + serial='1', + validity=240): + + cmd = [ + 'certutil', + '-C', + '-x', + '-d', self.directory, + '-h', self.token, + '-f', self.password_file, + '-c', subject_dn, + '-a', + '-i', request_file, + '-o', cert_file, + '-m', serial, + '-v', str(validity), + '--keyUsage', 'digitalSignature,nonRepudiation,certSigning,crlSigning,critical', + '-2', + '-3', + '--extSKID', + '--extAIA' + ] + + p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + + keystroke = '' + + # Is this a CA certificate [y/N]? + keystroke += 'y\n' + + # Enter the path length constraint, enter to skip [<0 for unlimited path]: + keystroke += '\n' + + # Is this a critical extension [y/N]? + keystroke += 'y\n' + + # Enter value for the authKeyID extension [y/N]? + keystroke += 'y\n' + + # TODO: generate SHA1 ID (see APolicyRule.formSHA1KeyId()) + # Enter value for the key identifier fields,enter to omit: + keystroke += '2d:7e:83:37:75:5a:fd:0e:8d:52:a3:70:16:93:36:b8:4a:d6:84:9f\n' + + # Select one of the following general name type: + keystroke += '0\n' + + # Enter value for the authCertSerial field, enter to omit: + keystroke += '\n' + + # Is this a critical extension [y/N]? + keystroke += '\n' + + # TODO: generate SHA1 ID (see APolicyRule.formSHA1KeyId()) + # Adding Subject Key ID extension. + # Enter value for the key identifier fields,enter to omit: + keystroke += '2d:7e:83:37:75:5a:fd:0e:8d:52:a3:70:16:93:36:b8:4a:d6:84:9f\n' + + # Is this a critical extension [y/N]? + keystroke += '\n' + + # Enter access method type for Authority Information Access extension: + keystroke += '2\n' + + # Select one of the following general name type: + keystroke += '7\n' + + # TODO: replace with actual hostname name and port number + # Enter data: + keystroke += 'http://server.example.com:8080/ca/ocsp\n' + + # Select one of the following general name type: + keystroke += '0\n' + + # Add another location to the Authority Information Access extension [y/N] + keystroke += '\n' + + # Is this a critical extension [y/N]? + keystroke += '\n' + + p.communicate(keystroke) + + rc = p.wait() + + if rc: + raise Exception('Failed to generate self-signed CA certificate. RC: %d' % rc) + + def get_cert(self, nickname, output_format='pem'): + + if output_format == 'pem': + output_format_option = '-a' + + elif output_format == 'base64': + output_format_option = '-r' + + else: + raise Exception('Unsupported output format: %s' % output_format) + + cmd = [ + 'certutil', + '-L', + '-d', self.directory, + '-h', self.token, + '-f', self.password_file, + '-n', nickname, + output_format_option + ] + + cert_data = subprocess.check_output(cmd) + + if output_format == 'base64': + cert_data = base64.b64encode(cert_data) + + return cert_data + + def remove_cert(self, nickname): + + cmd = [ + 'certutil', + '-D', + '-d', self.directory, + '-h', self.token, + '-f', self.password_file, + '-n', nickname + ] + + subprocess.check_call(cmd) + + def import_cert_chain(self, nickname, cert_chain_file, trust_attributes=None): + + tmpdir = tempfile.mkdtemp() + + try: + file_type = get_file_type(cert_chain_file) + + if file_type == 'cert': # import single PEM cert + self.add_cert( + nickname=nickname, + cert_file=cert_chain_file, + trust_attributes=trust_attributes) + return self.get_cert( + nickname=nickname, + output_format='base64') + + elif file_type == 'pkcs7': # import PKCS #7 cert chain + return self.import_pkcs7( + pkcs7_file=cert_chain_file, + nickname=nickname, + trust_attributes=trust_attributes, + output_format='base64') + + else: # import PKCS #7 data without header/footer + with open(cert_chain_file, 'r') as f: + base64_data = f.read() + pkcs7_data = convert_pkcs7(base64_data, 'base64', 'pem') + + tmp_cert_chain_file = os.path.join(tmpdir, 'cert_chain.p7b') + with open(tmp_cert_chain_file, 'w') as f: + f.write(pkcs7_data) + + self.import_pkcs7( + pkcs7_file=tmp_cert_chain_file, + nickname=nickname, + trust_attributes=trust_attributes) + + return base64_data + + finally: + shutil.rmtree(tmpdir) + + def import_pkcs7(self, pkcs7_file, nickname, trust_attributes=None, output_format='pem'): + + tmpdir = tempfile.mkdtemp() + + try: + # export certs from PKCS #7 into PEM output + output = subprocess.check_output([ + 'openssl', + 'pkcs7', + '-print_certs', + '-in', pkcs7_file + ]) + + # parse PEM output into separate PEM certificates + certs = [] + lines = [] + state = 'header' + + for line in output.splitlines(): + + if state == 'header': + if line != CERT_HEADER: + # ignore header lines + pass + else: + # save cert header + lines.append(line) + state = 'body' + + elif state == 'body': + if line != CERT_FOOTER: + # save cert body + lines.append(line) + else: + # save cert footer + lines.append(line) + + # construct PEM cert + cert = '\n'.join(lines) + certs.append(cert) + lines = [] + state = 'header' + + # import PEM certs into NSS database + counter = 1 + for cert in certs: + + cert_file = os.path.join(tmpdir, 'cert%d.pem' % counter) + with open(cert_file, 'w') as f: + f.write(cert) + + if counter == 1: + n = nickname + else: + n = '%s #%d' % (nickname, counter) + + self.add_cert(n, cert_file, trust_attributes) + + counter += 1 + + # convert PKCS #7 data to the requested format + with open(pkcs7_file, 'r') as f: + data = f.read() + + return convert_pkcs7(data, 'pem', output_format) + + finally: + shutil.rmtree(tmpdir) + + def import_pkcs12(self, pkcs12_file, pkcs12_password=None, pkcs12_password_file=None): + + tmpdir = tempfile.mkdtemp() + + try: + if pkcs12_password: + password_file = os.path.join(tmpdir, 'password.txt') + with open(password_file, 'w') as f: + f.write(pkcs12_password) + + elif pkcs12_password_file: + password_file = pkcs12_password_file + + else: + raise Exception('Missing PKCS #12 password') + + cmd = [ + 'pk12util', + '-d', self.directory, + '-h', self.token, + '-k', self.password_file, + '-i', pkcs12_file, + '-w', password_file + ] + + subprocess.check_call(cmd) + + finally: + shutil.rmtree(tmpdir) + + def export_pkcs12(self, pkcs12_file, nickname, pkcs12_password=None, pkcs12_password_file=None): + + tmpdir = tempfile.mkdtemp() + + try: + if pkcs12_password: + password_file = os.path.join(tmpdir, 'password.txt') + with open(password_file, 'w') as f: + f.write(pkcs12_password) + + elif pkcs12_password_file: + password_file = pkcs12_password_file + + else: + raise Exception('Missing PKCS #12 password') + + cmd = [ + 'pk12util', + '-d', self.directory, + '-k', self.password_file, + '-o', pkcs12_file, + '-w', password_file, + '-n', nickname + ] + + subprocess.check_call(cmd) + + finally: + shutil.rmtree(tmpdir) -- cgit