summaryrefslogtreecommitdiffstats
path: root/base/server/python/pki/server/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'base/server/python/pki/server/__init__.py')
-rw-r--r--base/server/python/pki/server/__init__.py106
1 files changed, 106 insertions, 0 deletions
diff --git a/base/server/python/pki/server/__init__.py b/base/server/python/pki/server/__init__.py
index 2c735f0ef..43082bb63 100644
--- a/base/server/python/pki/server/__init__.py
+++ b/base/server/python/pki/server/__init__.py
@@ -342,6 +342,13 @@ class PKISubsystem(object):
return str(self.instance) + '/' + self.name
+class ExternalCert(object):
+
+ def __init__(self, nickname=None, token=None):
+ self.nickname = nickname
+ self.token = token
+
+
@functools.total_ordering
class PKIInstance(object):
@@ -357,6 +364,9 @@ class PKIInstance(object):
self.conf_dir = os.path.join(self.base_dir, 'conf')
self.password_conf = os.path.join(self.conf_dir, 'password.conf')
+ self.external_certs_conf = os.path.join(
+ self.conf_dir, 'external_certs.conf')
+ self.external_certs = []
self.nssdb_dir = os.path.join(self.base_dir, 'alias')
self.lib_dir = os.path.join(self.base_dir, 'lib')
@@ -440,6 +450,8 @@ class PKIInstance(object):
value = parts[1]
self.passwords[name] = value
+ self.load_external_certs(self.external_certs_conf)
+
# load subsystems
for subsystem_name in os.listdir(self.registry_dir):
if subsystem_name in SUBSYSTEM_TYPES:
@@ -450,6 +462,30 @@ class PKIInstance(object):
subsystem.load()
self.subsystems.append(subsystem)
+ def load_external_certs(self, conf_file):
+ self.external_certs = PKIInstance.read_external_certs(conf_file)
+
+ @staticmethod
+ def read_external_certs(conf_file):
+ external_certs = []
+ # load external certs data
+ if os.path.exists(conf_file) and os.stat(conf_file).st_size > 0:
+ tmp_certs = {}
+ lines = open(conf_file).read().splitlines()
+ for line in lines:
+ m = re.search('(\\d+)\\.(\\w+)=(.*)', line)
+ if not m:
+ raise pki.PKIException('Error parsing %s' % conf_file)
+ indx = m.group(1)
+ attr = m.group(2)
+ value = m.group(3)
+ if indx not in tmp_certs:
+ tmp_certs[indx] = ExternalCert()
+
+ setattr(tmp_certs[indx], attr, value)
+ external_certs = tmp_certs.values()
+ return external_certs
+
def get_password(self, name):
if name in self.passwords:
return self.passwords[name]
@@ -465,6 +501,76 @@ class PKIInstance(object):
token=token,
password=self.get_password(token))
+ def external_cert_exists(self, nickname, token):
+ for cert in self.external_certs:
+ if cert.nickname == nickname and cert.token == token:
+ return True
+ return False
+
+ def add_external_cert(self, nickname, token):
+ if self.external_cert_exists(nickname, token):
+ return
+ self.external_certs.append(ExternalCert(nickname, token))
+ self.save_external_cert_data()
+
+ def delete_external_cert(self, nickname, token):
+ for cert in self.external_certs:
+ if cert.nickname == nickname and cert.token == token:
+ self.external_certs.remove(cert)
+ self.save_external_cert_data()
+
+ def save_external_cert_data(self):
+ with io.open(self.external_certs_conf, 'wb') as f:
+ indx = 0
+ for cert in self.external_certs:
+ f.write('%s.nickname=%s\n' % (str(indx), cert.nickname))
+ f.write('%s.token=%s\n' % (str(indx), cert.token))
+ indx += 1
+
+ def export_external_certs(self, pkcs12_file, pkcs12_password_file,
+ new_file=False):
+ for cert in self.external_certs:
+ nickname = cert.nickname
+ token = cert.token
+ if token == 'Internal Key Storage Token':
+ token = 'internal'
+ nssdb_password = self.get_password(token)
+
+ tmpdir = tempfile.mkdtemp()
+
+ try:
+ nssdb_password_file = os.path.join(tmpdir, 'password.txt')
+ with open(nssdb_password_file, 'w') as f:
+ f.write(nssdb_password)
+
+ # add the certificate, key, and chain
+ cmd = [
+ 'pki',
+ '-d', self.nssdb_dir,
+ '-C', nssdb_password_file
+ ]
+
+ if token and token != 'internal':
+ cmd.extend(['--token', token])
+
+ cmd.extend([
+ 'pkcs12-cert-add',
+ '--pkcs12', pkcs12_file,
+ '--pkcs12-password-file', pkcs12_password_file,
+ ])
+
+ if new_file:
+ cmd.extend(['--new-file'])
+
+ cmd.extend([
+ nickname
+ ])
+
+ subprocess.check_call(cmd)
+
+ finally:
+ shutil.rmtree(tmpdir)
+
def get_subsystem(self, name):
for subsystem in self.subsystems:
if name == subsystem.name: