diff options
Diffstat (limited to 'base/common/python/pki/nss.py')
-rw-r--r-- | base/common/python/pki/nss.py | 247 |
1 files changed, 220 insertions, 27 deletions
diff --git a/base/common/python/pki/nss.py b/base/common/python/pki/nss.py index f36b771f8..196fe462f 100644 --- a/base/common/python/pki/nss.py +++ b/base/common/python/pki/nss.py @@ -32,12 +32,19 @@ CSR_FOOTER = '-----END NEW CERTIFICATE REQUEST-----' CERT_HEADER = '-----BEGIN CERTIFICATE-----' CERT_FOOTER = '-----END CERTIFICATE-----' +PKCS7_HEADER = '-----BEGIN PKCS7-----' +PKCS7_FOOTER = '-----END PKCS7-----' + def convert_data(data, input_format, output_format, header=None, footer=None): + if input_format == output_format: + return data + if input_format == 'base64' and output_format == 'pem': # split a single line into multiple lines + data = data.rstrip('\r\n') lines = [data[i:i+64] for i in range(0, len(data), 64)] return '%s\n%s\n%s\n' % (header, '\n'.join(lines), footer) @@ -65,11 +72,32 @@ def convert_cert(cert_data, input_format, output_format): return convert_data(cert_data, input_format, output_format, CERT_HEADER, CERT_FOOTER) +def convert_pkcs7(pkcs7_data, input_format, output_format): + + return convert_data(pkcs7_data, input_format, output_format, PKCS7_HEADER, PKCS7_FOOTER) + +def get_file_type(filename): + + with open(filename, 'r') as f: + data = f.read() + + if data.startswith(CSR_HEADER): + return 'csr' + + if data.startswith(CERT_HEADER): + return 'cert' + + if data.startswith(PKCS7_HEADER): + return 'pkcs7' + + return None + class NSSDatabase(object): - def __init__(self, directory, password=None, password_file=None): + def __init__(self, directory, token='internal', password=None, password_file=None): self.directory = directory + self.token = token self.tmpdir = tempfile.mkdtemp() @@ -88,29 +116,38 @@ class NSSDatabase(object): shutil.rmtree(self.tmpdir) def add_cert(self, - nickname, cert_file, - trust_attributes='u,u,u'): + nickname, + cert_file, + trust_attributes=',,'): - subprocess.check_call([ + cmd = [ 'certutil', '-A', '-d', self.directory, + '-h', self.token, + '-f', self.password_file, '-n', nickname, '-i', cert_file, '-t', trust_attributes - ]) + ] + + subprocess.check_call(cmd) def modify_cert(self, nickname, - trust_attributes='u,u,u'): + trust_attributes): - subprocess.check_call([ + cmd = [ 'certutil', '-M', '-d', self.directory, + '-h', self.token, + '-f', self.password_file, '-n', nickname, '-t', trust_attributes - ]) + ] + + subprocess.check_call(cmd) def create_noise(self, noise_file, size=2048): @@ -123,27 +160,56 @@ class NSSDatabase(object): def create_request(self, subject_dn, - noise_file, - request_file): + request_file, + noise_file=None, + key_type=None, + key_size=None, + curve=None, + hash_alg=None): tmpdir = tempfile.mkdtemp() try: + if not noise_file: + noise_file = os.path.join(tmpdir, 'noise.bin') + if key_size: + size = key_size + else: + size = 2048 + self.create_noise( + noise_file=noise_file, + size=size) + binary_request_file = os.path.join(tmpdir, 'request.bin') - b64_request_file = os.path.join(tmpdir, 'request.b64') - # generate binary request - subprocess.check_call([ + cmd = [ 'certutil', '-R', '-d', self.directory, + '-h', self.token, '-f', self.password_file, '-s', subject_dn, - '-z', noise_file, - '-o', binary_request_file - ]) + '-o', binary_request_file, + '-z', noise_file + ] + + if key_type: + cmd.extend(['-k', key_type]) + + if key_size: + cmd.extend(['-g', str(key_size)]) + + if curve: + cmd.extend(['-q', curve]) + + if hash_alg: + cmd.extend(['-Z', hash_alg]) + + # generate binary request + subprocess.check_call(cmd) # encode binary request in base-64 + b64_request_file = os.path.join(tmpdir, 'request.b64') subprocess.check_call([ 'BtoA', binary_request_file, b64_request_file]) @@ -167,11 +233,12 @@ class NSSDatabase(object): serial='1', validity=240): - p = subprocess.Popen([ + cmd = [ 'certutil', '-C', '-x', '-d', self.directory, + '-h', self.token, '-f', self.password_file, '-c', subject_dn, '-a', @@ -184,7 +251,9 @@ class NSSDatabase(object): '-3', '--extSKID', '--extAIA' - ], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + ] + + p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) keystroke = '' @@ -245,7 +314,7 @@ class NSSDatabase(object): rc = p.wait() if rc: - raise Exception('Failed to generate self-signed CA certificate. RC: %d' + rc) + raise Exception('Failed to generate self-signed CA certificate. RC: %d' % rc) def get_cert(self, nickname, output_format='pem'): @@ -258,13 +327,17 @@ class NSSDatabase(object): else: raise Exception('Unsupported output format: %s' % output_format) - cert_data = subprocess.check_output([ + cmd = [ 'certutil', '-L', '-d', self.directory, + '-h', self.token, + '-f', self.password_file, '-n', nickname, output_format_option - ]) + ] + + cert_data = subprocess.check_output(cmd) if output_format == 'base64': cert_data = base64.b64encode(cert_data) @@ -273,12 +346,127 @@ class NSSDatabase(object): def remove_cert(self, nickname): - subprocess.check_call([ + cmd = [ 'certutil', '-D', '-d', self.directory, + '-h', self.token, + '-f', self.password_file, '-n', nickname - ]) + ] + + subprocess.check_call(cmd) + + def import_cert_chain(self, nickname, cert_chain_file, trust_attributes=None): + + tmpdir = tempfile.mkdtemp() + + try: + file_type = get_file_type(cert_chain_file) + + if file_type == 'cert': # import single PEM cert + self.add_cert( + nickname=nickname, + cert_file=cert_chain_file, + trust_attributes=trust_attributes) + return self.get_cert( + nickname=nickname, + output_format='base64') + + elif file_type == 'pkcs7': # import PKCS #7 cert chain + return self.import_pkcs7( + pkcs7_file=cert_chain_file, + nickname=nickname, + trust_attributes=trust_attributes, + output_format='base64') + + else: # import PKCS #7 data without header/footer + with open(cert_chain_file, 'r') as f: + base64_data = f.read() + pkcs7_data = convert_pkcs7(base64_data, 'base64', 'pem') + + tmp_cert_chain_file = os.path.join(tmpdir, 'cert_chain.p7b') + with open(tmp_cert_chain_file, 'w') as f: + f.write(pkcs7_data) + + self.import_pkcs7( + pkcs7_file=tmp_cert_chain_file, + nickname=nickname, + trust_attributes=trust_attributes) + + return base64_data + + finally: + shutil.rmtree(tmpdir) + + def import_pkcs7(self, pkcs7_file, nickname, trust_attributes=None, output_format='pem'): + + tmpdir = tempfile.mkdtemp() + + try: + # export certs from PKCS #7 into PEM output + output = subprocess.check_output([ + 'openssl', + 'pkcs7', + '-print_certs', + '-in', pkcs7_file + ]) + + # parse PEM output into separate PEM certificates + certs = [] + lines = [] + state = 'header' + + for line in output.splitlines(): + + if state == 'header': + if line != CERT_HEADER: + # ignore header lines + pass + else: + # save cert header + lines.append(line) + state = 'body' + + elif state == 'body': + if line != CERT_FOOTER: + # save cert body + lines.append(line) + else: + # save cert footer + lines.append(line) + + # construct PEM cert + cert = '\n'.join(lines) + certs.append(cert) + lines = [] + state = 'header' + + # import PEM certs into NSS database + counter = 1 + for cert in certs: + + cert_file = os.path.join(tmpdir, 'cert%d.pem' % counter) + with open(cert_file, 'w') as f: + f.write(cert) + + if counter == 1: + n = nickname + else: + n = '%s #%d' % (nickname, counter) + + self.add_cert(n, cert_file, trust_attributes) + + counter += 1 + + # convert PKCS #7 data to the requested format + with open(pkcs7_file, 'r') as f: + data = f.read() + + return convert_pkcs7(data, 'pem', output_format) + + finally: + shutil.rmtree(tmpdir) def import_pkcs12(self, pkcs12_file, pkcs12_password=None, pkcs12_password_file=None): @@ -296,13 +484,16 @@ class NSSDatabase(object): else: raise Exception('Missing PKCS #12 password') - subprocess.check_call([ + cmd = [ 'pk12util', '-d', self.directory, + '-h', self.token, '-k', self.password_file, '-i', pkcs12_file, '-w', password_file - ]) + ] + + subprocess.check_call(cmd) finally: shutil.rmtree(tmpdir) @@ -323,14 +514,16 @@ class NSSDatabase(object): else: raise Exception('Missing PKCS #12 password') - subprocess.check_call([ + cmd = [ 'pk12util', '-d', self.directory, '-k', self.password_file, '-o', pkcs12_file, '-w', password_file, '-n', nickname - ]) + ] + + subprocess.check_call(cmd) finally: shutil.rmtree(tmpdir) |