summaryrefslogtreecommitdiffstats
path: root/base/common/python/pki/nss.py
diff options
context:
space:
mode:
Diffstat (limited to 'base/common/python/pki/nss.py')
-rw-r--r--base/common/python/pki/nss.py247
1 files changed, 220 insertions, 27 deletions
diff --git a/base/common/python/pki/nss.py b/base/common/python/pki/nss.py
index f36b771f8..196fe462f 100644
--- a/base/common/python/pki/nss.py
+++ b/base/common/python/pki/nss.py
@@ -32,12 +32,19 @@ CSR_FOOTER = '-----END NEW CERTIFICATE REQUEST-----'
CERT_HEADER = '-----BEGIN CERTIFICATE-----'
CERT_FOOTER = '-----END CERTIFICATE-----'
+PKCS7_HEADER = '-----BEGIN PKCS7-----'
+PKCS7_FOOTER = '-----END PKCS7-----'
+
def convert_data(data, input_format, output_format, header=None, footer=None):
+ if input_format == output_format:
+ return data
+
if input_format == 'base64' and output_format == 'pem':
# split a single line into multiple lines
+ data = data.rstrip('\r\n')
lines = [data[i:i+64] for i in range(0, len(data), 64)]
return '%s\n%s\n%s\n' % (header, '\n'.join(lines), footer)
@@ -65,11 +72,32 @@ def convert_cert(cert_data, input_format, output_format):
return convert_data(cert_data, input_format, output_format, CERT_HEADER, CERT_FOOTER)
+def convert_pkcs7(pkcs7_data, input_format, output_format):
+
+ return convert_data(pkcs7_data, input_format, output_format, PKCS7_HEADER, PKCS7_FOOTER)
+
+def get_file_type(filename):
+
+ with open(filename, 'r') as f:
+ data = f.read()
+
+ if data.startswith(CSR_HEADER):
+ return 'csr'
+
+ if data.startswith(CERT_HEADER):
+ return 'cert'
+
+ if data.startswith(PKCS7_HEADER):
+ return 'pkcs7'
+
+ return None
+
class NSSDatabase(object):
- def __init__(self, directory, password=None, password_file=None):
+ def __init__(self, directory, token='internal', password=None, password_file=None):
self.directory = directory
+ self.token = token
self.tmpdir = tempfile.mkdtemp()
@@ -88,29 +116,38 @@ class NSSDatabase(object):
shutil.rmtree(self.tmpdir)
def add_cert(self,
- nickname, cert_file,
- trust_attributes='u,u,u'):
+ nickname,
+ cert_file,
+ trust_attributes=',,'):
- subprocess.check_call([
+ cmd = [
'certutil',
'-A',
'-d', self.directory,
+ '-h', self.token,
+ '-f', self.password_file,
'-n', nickname,
'-i', cert_file,
'-t', trust_attributes
- ])
+ ]
+
+ subprocess.check_call(cmd)
def modify_cert(self,
nickname,
- trust_attributes='u,u,u'):
+ trust_attributes):
- subprocess.check_call([
+ cmd = [
'certutil',
'-M',
'-d', self.directory,
+ '-h', self.token,
+ '-f', self.password_file,
'-n', nickname,
'-t', trust_attributes
- ])
+ ]
+
+ subprocess.check_call(cmd)
def create_noise(self, noise_file, size=2048):
@@ -123,27 +160,56 @@ class NSSDatabase(object):
def create_request(self,
subject_dn,
- noise_file,
- request_file):
+ request_file,
+ noise_file=None,
+ key_type=None,
+ key_size=None,
+ curve=None,
+ hash_alg=None):
tmpdir = tempfile.mkdtemp()
try:
+ if not noise_file:
+ noise_file = os.path.join(tmpdir, 'noise.bin')
+ if key_size:
+ size = key_size
+ else:
+ size = 2048
+ self.create_noise(
+ noise_file=noise_file,
+ size=size)
+
binary_request_file = os.path.join(tmpdir, 'request.bin')
- b64_request_file = os.path.join(tmpdir, 'request.b64')
- # generate binary request
- subprocess.check_call([
+ cmd = [
'certutil',
'-R',
'-d', self.directory,
+ '-h', self.token,
'-f', self.password_file,
'-s', subject_dn,
- '-z', noise_file,
- '-o', binary_request_file
- ])
+ '-o', binary_request_file,
+ '-z', noise_file
+ ]
+
+ if key_type:
+ cmd.extend(['-k', key_type])
+
+ if key_size:
+ cmd.extend(['-g', str(key_size)])
+
+ if curve:
+ cmd.extend(['-q', curve])
+
+ if hash_alg:
+ cmd.extend(['-Z', hash_alg])
+
+ # generate binary request
+ subprocess.check_call(cmd)
# encode binary request in base-64
+ b64_request_file = os.path.join(tmpdir, 'request.b64')
subprocess.check_call([
'BtoA', binary_request_file, b64_request_file])
@@ -167,11 +233,12 @@ class NSSDatabase(object):
serial='1',
validity=240):
- p = subprocess.Popen([
+ cmd = [
'certutil',
'-C',
'-x',
'-d', self.directory,
+ '-h', self.token,
'-f', self.password_file,
'-c', subject_dn,
'-a',
@@ -184,7 +251,9 @@ class NSSDatabase(object):
'-3',
'--extSKID',
'--extAIA'
- ], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+ ]
+
+ p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
keystroke = ''
@@ -245,7 +314,7 @@ class NSSDatabase(object):
rc = p.wait()
if rc:
- raise Exception('Failed to generate self-signed CA certificate. RC: %d' + rc)
+ raise Exception('Failed to generate self-signed CA certificate. RC: %d' % rc)
def get_cert(self, nickname, output_format='pem'):
@@ -258,13 +327,17 @@ class NSSDatabase(object):
else:
raise Exception('Unsupported output format: %s' % output_format)
- cert_data = subprocess.check_output([
+ cmd = [
'certutil',
'-L',
'-d', self.directory,
+ '-h', self.token,
+ '-f', self.password_file,
'-n', nickname,
output_format_option
- ])
+ ]
+
+ cert_data = subprocess.check_output(cmd)
if output_format == 'base64':
cert_data = base64.b64encode(cert_data)
@@ -273,12 +346,127 @@ class NSSDatabase(object):
def remove_cert(self, nickname):
- subprocess.check_call([
+ cmd = [
'certutil',
'-D',
'-d', self.directory,
+ '-h', self.token,
+ '-f', self.password_file,
'-n', nickname
- ])
+ ]
+
+ subprocess.check_call(cmd)
+
+ def import_cert_chain(self, nickname, cert_chain_file, trust_attributes=None):
+
+ tmpdir = tempfile.mkdtemp()
+
+ try:
+ file_type = get_file_type(cert_chain_file)
+
+ if file_type == 'cert': # import single PEM cert
+ self.add_cert(
+ nickname=nickname,
+ cert_file=cert_chain_file,
+ trust_attributes=trust_attributes)
+ return self.get_cert(
+ nickname=nickname,
+ output_format='base64')
+
+ elif file_type == 'pkcs7': # import PKCS #7 cert chain
+ return self.import_pkcs7(
+ pkcs7_file=cert_chain_file,
+ nickname=nickname,
+ trust_attributes=trust_attributes,
+ output_format='base64')
+
+ else: # import PKCS #7 data without header/footer
+ with open(cert_chain_file, 'r') as f:
+ base64_data = f.read()
+ pkcs7_data = convert_pkcs7(base64_data, 'base64', 'pem')
+
+ tmp_cert_chain_file = os.path.join(tmpdir, 'cert_chain.p7b')
+ with open(tmp_cert_chain_file, 'w') as f:
+ f.write(pkcs7_data)
+
+ self.import_pkcs7(
+ pkcs7_file=tmp_cert_chain_file,
+ nickname=nickname,
+ trust_attributes=trust_attributes)
+
+ return base64_data
+
+ finally:
+ shutil.rmtree(tmpdir)
+
+ def import_pkcs7(self, pkcs7_file, nickname, trust_attributes=None, output_format='pem'):
+
+ tmpdir = tempfile.mkdtemp()
+
+ try:
+ # export certs from PKCS #7 into PEM output
+ output = subprocess.check_output([
+ 'openssl',
+ 'pkcs7',
+ '-print_certs',
+ '-in', pkcs7_file
+ ])
+
+ # parse PEM output into separate PEM certificates
+ certs = []
+ lines = []
+ state = 'header'
+
+ for line in output.splitlines():
+
+ if state == 'header':
+ if line != CERT_HEADER:
+ # ignore header lines
+ pass
+ else:
+ # save cert header
+ lines.append(line)
+ state = 'body'
+
+ elif state == 'body':
+ if line != CERT_FOOTER:
+ # save cert body
+ lines.append(line)
+ else:
+ # save cert footer
+ lines.append(line)
+
+ # construct PEM cert
+ cert = '\n'.join(lines)
+ certs.append(cert)
+ lines = []
+ state = 'header'
+
+ # import PEM certs into NSS database
+ counter = 1
+ for cert in certs:
+
+ cert_file = os.path.join(tmpdir, 'cert%d.pem' % counter)
+ with open(cert_file, 'w') as f:
+ f.write(cert)
+
+ if counter == 1:
+ n = nickname
+ else:
+ n = '%s #%d' % (nickname, counter)
+
+ self.add_cert(n, cert_file, trust_attributes)
+
+ counter += 1
+
+ # convert PKCS #7 data to the requested format
+ with open(pkcs7_file, 'r') as f:
+ data = f.read()
+
+ return convert_pkcs7(data, 'pem', output_format)
+
+ finally:
+ shutil.rmtree(tmpdir)
def import_pkcs12(self, pkcs12_file, pkcs12_password=None, pkcs12_password_file=None):
@@ -296,13 +484,16 @@ class NSSDatabase(object):
else:
raise Exception('Missing PKCS #12 password')
- subprocess.check_call([
+ cmd = [
'pk12util',
'-d', self.directory,
+ '-h', self.token,
'-k', self.password_file,
'-i', pkcs12_file,
'-w', password_file
- ])
+ ]
+
+ subprocess.check_call(cmd)
finally:
shutil.rmtree(tmpdir)
@@ -323,14 +514,16 @@ class NSSDatabase(object):
else:
raise Exception('Missing PKCS #12 password')
- subprocess.check_call([
+ cmd = [
'pk12util',
'-d', self.directory,
'-k', self.password_file,
'-o', pkcs12_file,
'-w', password_file,
'-n', nickname
- ])
+ ]
+
+ subprocess.check_call(cmd)
finally:
shutil.rmtree(tmpdir)