path: root/base
diff options
authorAde Lee <>2015-09-14 22:42:57 -0400
committerAde Lee <>2015-09-27 11:16:06 -0400
commite037a73e1ea52719473e03c554ce6e3544967907 (patch)
treef58d265c30f27e6b8a3f18078af63ce999ef72c5 /base
parentd8f9c77a62a7bfea82e892f8ecb309630826fed5 (diff)
Python client for subcas
Includes python code (and unit tests!) to list, get and create subCAs. Also fixed a couple of PEP 8 violations that crept in.
Diffstat (limited to 'base')
4 files changed, 519 insertions, 28 deletions
diff --git a/base/common/python/pki/ b/base/common/python/pki/
new file mode 100644
index 000000000..295c4ead8
--- /dev/null
+++ b/base/common/python/pki/
@@ -0,0 +1,445 @@
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; version 2 of the License.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+# Copyright (C) 2014 Red Hat, Inc.
+# All rights reserved.
+# Author:
+# Ade Lee <>
+from __future__ import absolute_import
+from __future__ import print_function
+import json
+from six import iteritems
+import uuid
+import pki
+import pki.client as client
+import pki.encoder as encoder
+import pki.cert as cert
+class AuthorityData(object):
+ """
+ Class containing authority data to be sent to/from the server when
+ getting or creating subordinate CAs
+ """
+ json_attribute_names = {
+ 'id': 'aid',
+ 'description': 'description',
+ 'dn': 'dn',
+ 'enabled': 'enabled',
+ 'isHostAuthority': 'is_host_authority',
+ 'link': 'link',
+ 'parentAID': 'parent_aid'
+ }
+ def __init__(self, dn=None, aid=None, parent_aid=None,
+ description=None, enabled="False",
+ is_host_authority="False", link=None):
+ self.dn = dn
+ self.aid = aid
+ self.parent_aid = parent_aid
+ self.description = description
+ self.enabled = (enabled.lower() == "true")
+ self.is_host_authority = (is_host_authority.lower() == "true")
+ = link
+ def __repr__(self):
+ attributes = {
+ "AuthorityData": {
+ "aid": self.aid,
+ "dn": self.dn,
+ "description": self.description,
+ "is_host_authority": self.is_host_authority,
+ "parent_aid": self.parent_aid,
+ "enabled": self.enabled
+ }
+ }
+ return str(attributes)
+ @classmethod
+ def from_json(cls, attr_list):
+ """ Return AuthorityData object from JSON dict """
+ ca_data = cls()
+ for k, v in iteritems(attr_list):
+ if k not in ['link']:
+ if k in AuthorityData.json_attribute_names:
+ setattr(ca_data, AuthorityData.json_attribute_names[k], v)
+ else:
+ setattr(ca_data, k, v)
+ if 'link' in attr_list:
+ = pki.Link.from_json(attr_list['link'])
+ return ca_data
+class AuthorityDataCollection(object):
+ """
+ Class containing list of AuthorityData objects and their respective link
+ objects.
+ This data is returned when searching/listing authorities.
+ """
+ def __init__(self):
+ """ Constructor """
+ self.ca_list = []
+ self.links = []
+ def __iter__(self):
+ return iter(self.ca_list)
+ @classmethod
+ def from_json(cls, json_value):
+ """ Populate object from JSON input """
+ ret = cls()
+ cas = json_value
+ if not isinstance(cas, list):
+ ret.ca_list.append(AuthorityData.from_json(cas))
+ else:
+ for ca in cas:
+ ret.ca_list.append(
+ AuthorityData.from_json(ca))
+ return ret
+class AuthorityClient(object):
+ """
+ Class encapsulating and mirroring the functionality in the
+ AuthorityResource Java interface class defining the REST API for
+ subordinate CA (authority) resources.
+ """
+ def __init__(self, connection):
+ """ Constructor """
+ self.connection = connection
+ self.ca_url = '/rest/authorities'
+ @pki.handle_exceptions()
+ def get_ca(self, aid):
+ """ Return a AuthorityData object for a subordinate CA. """
+ if aid is None:
+ raise ValueError("Subordinate aid must be specified")
+ url = self.ca_url + '/' + str(aid)
+ headers = {'Content-type': 'application/json',
+ 'Accept': 'application/json'}
+ r = self.connection.get(url, headers)
+ return AuthorityData.from_json(r.json())
+ @pki.handle_exceptions()
+ def get_cert(self, aid, output_format="PEM"):
+ """Return the signing certificate for the CA
+ :param aid: ID for the CA
+ :param output_format: either 'PEM' or 'DER'
+ :return: CA certificate in relevant format
+ """
+ """ Return the signing certificate for the CA. """
+ if aid is None:
+ raise ValueError("CA ID must be specified")
+ url = '{}/{}/cert'.format(self.ca_url, str(aid))
+ headers = {'Content-type': 'application/json'}
+ if output_format == "PEM":
+ headers['Accept'] = "application/x-pem-file"
+ elif output_format == "DER":
+ headers['Accept'] = "application/pkix-cert"
+ else:
+ raise ValueError(
+ "Invalid format passed in - PEM or DER expected.")
+ r = self.connection.get(url, headers)
+ return r.text
+ @pki.handle_exceptions()
+ def get_chain(self, aid, output_format="PKCS7"):
+ """Returns the certificate chain for the CA.
+ :param aid: ID for the CA
+ :param output_format: either PEM or PKCS7
+ :return: CA certificate chain in requested format
+ """
+ if aid is None:
+ raise ValueError("CA ID must be specified")
+ url = '{}/{}/chain'.format(self.ca_url, str(aid))
+ headers = {'Content-type': 'application/json'}
+ if output_format == "PEM":
+ headers['Accept'] = "application/x-pem-file"
+ elif output_format == "PKCS7":
+ headers['Accept'] = "application/pkcs7-mime"
+ r = self.connection.get(url, headers)
+ return r.text
+ @pki.handle_exceptions()
+ def list_cas(self, max_results=None, max_time=None, start=None, size=None):
+ """ Return a AuthorityDataCollection object of subordinate CAs
+ Right now, this is going to list all the defined authorities. We will
+ add search criteria when this is defined on the Java interface.
+ """
+ query_params = {"maxResults": max_results, "maxTime": max_time,
+ "start": start, "size": size}
+ headers = {'Content-type': 'application/json',
+ 'Accept': 'application/json'}
+ response = self.connection.get(
+ path=self.ca_url,
+ headers=headers,
+ params=query_params)
+ return AuthorityDataCollection.from_json(response.json())
+ @pki.handle_exceptions()
+ def create_ca(self, ca_data):
+ """ Create authority (subCA)
+ :param ca_data: AuthorityData object containing parameters that
+ describe how a subordinate authority should be constructed.
+ :return: AuthorityData object for the created subordinate CA
+ """
+ if ca_data is None:
+ raise ValueError("ca_data must be defined")
+ if ca_data.dn is None:
+ raise ValueError("Subject DN must be defined in ca_data")
+ if ca_data.description is None:
+ raise ValueError('Description must be defined in ca_data')
+ if ca_data.parent_aid is None:
+ raise ValueError('parent_aid must be defined. '
+ 'Top level CAs are not yet supported')
+ create_request = json.dumps(ca_data, cls=encoder.CustomTypeEncoder,
+ sort_keys=True)
+ headers = {'Content-type': 'application/json',
+ 'Accept': 'application/json'}
+ response =
+ self.ca_url,
+ create_request,
+ headers)
+ new_ca = AuthorityData.from_json(response.json())
+ return new_ca
+ @pki.handle_exceptions()
+ def enable_ca(self, aid):
+ """Enable the specified CA
+ :param aid: ID of the CA to be enabled
+ :return: None
+ """
+ if aid is None:
+ raise ValueError("CA ID must be specified")
+ url = '{}/{}/enable'.format(self.ca_url, str(aid))
+ headers = {'Content-type': 'application/json',
+ 'Accept': 'application/json'}
+, headers)
+ @pki.handle_exceptions()
+ def disable_ca(self, aid):
+ """Disable the specified CA
+ :param aid: ID of the CA to be disabled
+ :return: None
+ """
+ if aid is None:
+ raise ValueError("CA ID must be specified")
+ url = '{}/{}/disable'.format(self.ca_url, str(aid))
+ headers = {'Content-type': 'application/json',
+ 'Accept': 'application/json'}
+, headers)
+encoder.NOTYPES['AuthorityData'] = AuthorityData
+def issue_cert_using_authority(cert_client, authority_id):
+ print("Issuing Cert using subordinate CA")
+ print("---------------------------------")
+ print("aid: " + authority_id)
+ inputs = dict()
+ inputs['cert_request_type'] = 'crmf'
+ inputs['cert_request'] = "MIIBpDCCAaAwggEGAgUA5n9VYTCBx4ABAqUOMAwxCjAIBgN" \
+ "/SmUVoUjBtqHNw/e3OoCSXw42pdQSR53/eYJWpf7nyTbZ9U" \
+ "uIhGfXOtxy5vRetmDHE9u0AopmuJbr1rL17/tSnDakpkE9u" \
+ "mQ2lMOReLloSdX32w2xOeulUwh5BGbFpq10S0SvW1H93Vn0" \
+ "hkiG9w0BAQUFAAOBgQCuywnrDk/wGwfbguw9oVs9gzFQwM4" \
+ "zeFbk+z82G5CWoG/4mVOT5LPL5Q8iF+KfnaU9Qcu6zZPxW6" \
+ "ZmDd8WpPJ+MTPyQl3Q5BfiKa4l5ra1NeqxMOlMiiupwINmm" \
+ "7jd1KaA2eIjuyC8/gTaO4b14R6aRaOj+Scp9cNYbthA7REh" \
+ "Jw=="
+ inputs['sn_uid'] = 'test12345'
+ inputs['sn_e'] = ''
+ inputs['sn_cn'] = 'TestUser'
+ enrollment_results = cert_client.enroll_cert(
+ 'caUserCert', inputs, authority_id)
+ for enrollment_result in enrollment_results:
+ request_data = enrollment_result.request
+ cert_data = enrollment_result.cert
+ print('Request ID: ' + request_data.request_id)
+ print('Request Status:' + request_data.request_status)
+ print('Serial Number: ' + cert_data.serial_number)
+ print('Issuer: ' + cert_data.issuer_dn)
+ print('Subject: ' + cert_data.subject_dn)
+ print('Pretty Print:')
+ print(cert_data.pretty_repr)
+ print()
+def main():
+ # Create a PKIConnection object that stores the details of the CA.
+ connection = client.PKIConnection('https', 'localhost', '8453', 'ca')
+ # The pem file used for authentication. Created from a p12 file using the
+ # command -
+ # openssl pkcs12 -in <p12_file_path> -out /tmp/auth.pem -nodes
+ connection.set_authentication_cert("/tmp/auth.pem")
+ # Instantiate the CertClient
+ ca_client = AuthorityClient(connection)
+ # Create a top level authority
+ print("Creating a new top level CA")
+ print("-----------------------------")
+ subca_subject = ('cn=subca ' + str(uuid.uuid4()) +
+ ' signing cert,')
+ sub_subca_subject = ('cn=subca2 ' + str(uuid.uuid4()) +
+ ' signing cert,')
+ authority_data = {
+ 'dn': subca_subject,
+ 'description': 'Test Top-level subordinate CA',
+ }
+ data = AuthorityData(**authority_data)
+ try:
+ subca = ca_client.create_ca(data)
+ except ValueError as e:
+ print(e.message)
+ # Get a top-level CA
+ print("Getting a top level CA")
+ print("----------------------")
+ authorities = ca_client.list_cas()
+ for ca in authorities.ca_list:
+ if ca.parent_aid is None:
+ top_ca = ca
+ print(str(top_ca))
+ # Create a sub CA
+ print("Creating a new subordinate CA")
+ print("-----------------------------")
+ authority_data = {
+ 'dn': subca_subject,
+ 'description': 'Test subordinate CA',
+ 'parent_aid': top_ca.aid
+ }
+ data = AuthorityData(**authority_data)
+ subca = ca_client.create_ca(data)
+ print(ca_client.get_ca(subca.aid))
+ # Get the authority signing cert and pkcs7 chain
+ pem_cert = ca_client.get_cert(subca.aid, "PEM")
+ print("PEM CA Signing Cert:")
+ print(pem_cert)
+ pkcs7_chain = ca_client.get_chain(subca.aid, "PKCS7")
+ print("PKCS7 Cert Chain:")
+ print(pkcs7_chain)
+ pem_chain = ca_client.get_chain(subca.aid, "PEM")
+ print("PEM Cert Chain:")
+ print(pem_chain)
+ # List all authorities
+ print("Listing all authorities")
+ print("-----------------------")
+ authorities = ca_client.list_cas()
+ for ca in authorities.ca_list:
+ print(str(ca))
+ # Issue a cert using the sub-CA
+ cert_client = cert.CertClient(connection)
+ issue_cert_using_authority(cert_client, subca.aid)
+ # Create a sub-sub CA
+ print('Create a sub-sub CA')
+ print('-------------------')
+ sub_subca_data = {
+ 'dn': sub_subca_subject,
+ 'description': 'Test sub-sub CA',
+ 'parent_aid': subca.aid
+ }
+ data = AuthorityData(**sub_subca_data)
+ sub_subca = ca_client.create_ca(data)
+ print(ca_client.get_ca(sub_subca.aid))
+ # Get the authority signing cert and PKCS7
+ # Get the authority signing cert and pkcs7 chain
+ pem_cert = ca_client.get_cert(sub_subca.aid, "PEM")
+ print("PEM CA Signing Cert:")
+ print(pem_cert)
+ pkcs7_chain = ca_client.get_chain(sub_subca.aid, "PKCS7")
+ print("PKCS7 Cert Chain:")
+ print(pkcs7_chain)
+ pem_chain = ca_client.get_chain(sub_subca.aid, "PEM")
+ print("PEM Cert Chain:")
+ print(pem_chain)
+ # issue a cert using the sub-subca
+ cert_client = cert.CertClient(connection)
+ issue_cert_using_authority(cert_client, sub_subca.aid)
+ # disable the sub-subca
+ print("Disable sub sub CA")
+ ca_client.disable_ca(sub_subca.aid)
+ # Get sub-subca
+ sub_subca = ca_client.get_ca(sub_subca.aid)
+ print(str(sub_subca))
+ # issue a cert using sub-subca
+ issue_cert_using_authority(cert_client, sub_subca.aid)
+if __name__ == "__main__":
+ main()
diff --git a/base/common/python/pki/ b/base/common/python/pki/
index 2d58949cd..b11757ca2 100644
--- a/base/common/python/pki/
+++ b/base/common/python/pki/
@@ -676,7 +676,7 @@ class CertClient(object):
def _submit_revoke_request(self, url, cert_serial_number,
revocation_reason=None, invalidity_date=None,
- comments=None, nonce=None):
+ comments=None, nonce=None, authority=None):
Submits a certificate revocation request.
Expects the URL for submitting the request.
@@ -698,12 +698,22 @@ class CertClient(object):
revoke_request = json.dumps(request, cls=encoder.CustomTypeEncoder,
- r =, revoke_request, headers=self.headers)
+ params = {}
+ if authority:
+ params['authority'] = authority
+ r =
+ url,
+ revoke_request,
+ headers=self.headers,
+ params=params)
return CertRequestInfo.from_json(r.json())
def revoke_cert(self, cert_serial_number, revocation_reason=None,
- invalidity_date=None, comments=None, nonce=None):
+ invalidity_date=None, comments=None, nonce=None,
+ authority=None):
""" Revokes a certificate.
Returns a CertRequestInfo object with information about the request.
This method requires an agent's authentication cert in the
@@ -712,11 +722,12 @@ class CertClient(object):
url = self.agent_cert_url + '/' + str(cert_serial_number) + '/revoke'
return self._submit_revoke_request(url, cert_serial_number,
revocation_reason, invalidity_date,
- comments, nonce)
+ comments, nonce, authority)
def revoke_ca_cert(self, cert_serial_number, revocation_reason=None,
- invalidity_date=None, comments=None, nonce=None):
+ invalidity_date=None, comments=None, nonce=None,
+ authority=None):
""" Revokes a CA certificate.
Returns a CertRequestInfo object with information about the request.
This method requires an agent's authentication cert in the
@@ -726,10 +737,10 @@ class CertClient(object):
return self._submit_revoke_request(url, cert_serial_number,
revocation_reason, invalidity_date,
- comments, nonce)
+ comments, nonce, authority)
- def hold_cert(self, cert_serial_number, comments=None):
+ def hold_cert(self, cert_serial_number, comments=None, authority=None):
""" Places a certificate on-hold.
Calls the revoke_cert method with reason -
@@ -738,10 +749,10 @@ class CertClient(object):
connection object.
return self.revoke_cert(cert_serial_number, 'Certificate_Hold',
- comments=comments)
+ comments=comments, authority=authority)
- def unrevoke_cert(self, cert_serial_number):
+ def unrevoke_cert(self, cert_serial_number, authority=None):
""" Un-revokes a revoked certificate.
Returns a CertRequestInfo object.
This method requires an agent's authentication cert in the
@@ -751,7 +762,17 @@ class CertClient(object):
raise ValueError("Certificate ID must be specified")
url = self.agent_cert_url + '/' + str(cert_serial_number) + '/unrevoke'
- r =, None, headers=self.headers)
+ params = {}
+ if authority is not None:
+ params['authority'] = authority
+ r =
+ url,
+ None,
+ headers=self.headers,
+ params=params)
return CertRequestInfo.from_json(r.json())
@@ -950,7 +971,7 @@ class CertClient(object):
return enrollment_template
- def submit_enrollment_request(self, enrollment_request):
+ def submit_enrollment_request(self, enrollment_request, authority=None):
Submits the CertEnrollmentRequest object to the server.
Returns a CertRequestInfoCollection object with information about the
@@ -959,13 +980,18 @@ class CertClient(object):
request_object = json.dumps(enrollment_request,
+ params = {}
+ if authority is not None:
+ params['authority'] = authority
# print request_object
r =, request_object,
- self.headers)
+ self.headers, params)
return CertRequestInfoCollection.from_json(r.json())
- def enroll_cert(self, profile_id, inputs):
+ def enroll_cert(self, profile_id, inputs, authority=None):
A convenience method for enrolling a certificate for a given profile id.
The inputs parameter should be a dictionary with values for the profile
@@ -988,7 +1014,8 @@ class CertClient(object):
enroll_request = self.create_enrollment_request(profile_id, inputs)
# Submit the enrollment request
- cert_request_infos = self.submit_enrollment_request(enroll_request)
+ cert_request_infos = self.submit_enrollment_request(
+ enroll_request, authority)
# Approve the requests generated for the certificate enrollment.
# Fetch the CertData objects for all the certificates created and
diff --git a/base/server/python/pki/server/ b/base/server/python/pki/server/
index ec4dd7e9c..01f1e9427 100644
--- a/base/server/python/pki/server/
+++ b/base/server/python/pki/server/
@@ -40,6 +40,7 @@ REGISTRY_DIR = '/etc/sysconfig/pki'
SUBSYSTEM_TYPES = ['ca', 'kra', 'ocsp', 'tks', 'tps']
class PKIServer(object):
@@ -75,7 +76,12 @@ class PKISubsystem(object):
self.cs_conf = os.path.join(self.conf_dir, 'CS.cfg')
self.context_xml_template = os.path.join(
- pki.SHARE_DIR,, 'conf', 'Catalina', 'localhost', + '.xml')
+ pki.SHARE_DIR,
+ 'conf',
+ 'Catalina',
+ 'localhost',
+ + '.xml')
self.context_xml = os.path.join(
instance.conf_dir, 'Catalina', 'localhost', + '.xml')
@@ -117,18 +123,26 @@ class PKISubsystem(object):
def create_subsystem_cert_object(self, cert_id):
cert = {}
cert['id'] = cert_id
- cert['nickname'] = self.config.get('%s.%s.nickname' % (, cert_id), None)
- cert['token'] = self.config.get('%s.%s.tokenname' % (, cert_id), None)
- cert['data'] = self.config.get('%s.%s.cert' % (, cert_id), None)
- cert['request'] = self.config.get('%s.%s.certreq' % (, cert_id), None)
+ cert['nickname'] = self.config.get(
+ '%s.%s.nickname' % (, cert_id), None)
+ cert['token'] = self.config.get(
+ '%s.%s.tokenname' % (, cert_id), None)
+ cert['data'] = self.config.get(
+ '%s.%s.cert' % (, cert_id), None)
+ cert['request'] = self.config.get(
+ '%s.%s.certreq' % (, cert_id), None)
return cert
def update_subsystem_cert(self, cert):
cert_id = cert['id']
- self.config['%s.%s.nickname' % (, cert_id)] = cert.get('nickname', None)
- self.config['%s.%s.tokenname' % (, cert_id)] = cert.get('token', None)
- self.config['%s.%s.cert' % (, cert_id)] = cert.get('data', None)
- self.config['%s.%s.certreq' % (, cert_id)] = cert.get('request', None)
+ self.config['%s.%s.nickname' % (, cert_id)] = (
+ cert.get('nickname', None))
+ self.config['%s.%s.tokenname' % (, cert_id)] = (
+ cert.get('token', None))
+ self.config['%s.%s.cert' % (, cert_id)] = (
+ cert.get('data', None))
+ self.config['%s.%s.certreq' % (, cert_id)] = (
+ cert.get('request', None))
def save(self):
sorted_config = sorted(self.config.items(), key=operator.itemgetter(0))
@@ -177,7 +191,9 @@ class PKISubsystem(object):
url = 'ldap://%s:%s' % (hostname, port)
- raise Exception('Invalid parameter value in %s.ldapconn.secureConn: %s' % (name, secure))
+ raise Exception(
+ 'Invalid parameter value in %s.ldapconn.secureConn: %s' %
+ (name, secure))
connection = PKIDatabaseConnection(url)
@@ -192,12 +208,15 @@ class PKISubsystem(object):
elif auth_type == 'SslClientAuth':
- client_cert_nickname=self.config['%s.ldapauth.clientCertNickname' % name],
+ client_cert_nickname=self.config[
+ '%s.ldapauth.clientCertNickname' % name],
- raise Exception('Invalid parameter value in %s.ldapauth.authtype: %s' % (name, auth_type))
+ raise Exception(
+ 'Invalid parameter value in %s.ldapauth.authtype: %s' %
+ (name, auth_type))
@@ -391,7 +410,7 @@ class PKIDatabaseConnection(object):
self.nssdb_dir = nssdb_dir
def set_credentials(self, bind_dn=None, bind_password=None,
- client_cert_nickname=None, nssdb_password=None):
+ client_cert_nickname=None, nssdb_password=None):
self.bind_dn = bind_dn
self.bind_password = bind_password
self.client_cert_nickname = client_cert_nickname
diff --git a/base/server/python/pki/server/cli/ b/base/server/python/pki/server/cli/
index 688a5c6ed..f72292a9d 100644
--- a/base/server/python/pki/server/cli/
+++ b/base/server/python/pki/server/cli/
@@ -511,7 +511,7 @@ class SubsystemCertUpdateCLI(pki.cli.CLI):
subsystem_cert['data'] = data
# format cert data for LDAP database
- lines = [data[i:i+64] for i in range(0, len(data), 64)]
+ lines = [data[i:i + 64] for i in range(0, len(data), 64)]
data = string.join(lines, '\r\n') + '\r\n'
# get cert request from local CA