summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--base/common/python/pki/cert.py694
-rw-r--r--base/common/python/pki/profile.py577
2 files changed, 1261 insertions, 10 deletions
diff --git a/base/common/python/pki/cert.py b/base/common/python/pki/cert.py
index c01410485..b22307ad1 100644
--- a/base/common/python/pki/cert.py
+++ b/base/common/python/pki/cert.py
@@ -4,11 +4,14 @@ Created on Feb 13, 2014
@author: akoneru
"""
+import copy
import json
+import types
+
import pki
import pki.client as client
import pki.encoder as encoder
-import types
+import pki.profile as profile
class CertId(object):
@@ -104,8 +107,8 @@ class CertDataInfo(object):
class CertDataInfoCollection(object):
"""
- Class containing lists of CertDataInfo objects.
- This data is returned when searching/listing certificate records on the CA.
+ Class containing list of CertDataInfo objects and their respective link objects.
+ This data is returned when searching/listing certificate records in the CA.
"""
def __init__(self):
@@ -162,7 +165,7 @@ class CertRequestInfo(object):
cert_request_info.request_id = \
str(cert_request_info.request_url)[(str(cert_request_info.request_url).rfind("/") + 1):]
#Optional parameters
- if 'certID' in attr_list:
+ if 'certId' in attr_list:
cert_request_info.cert_id = attr_list['certId']
if 'certURL' in attr_list:
cert_request_info.cert_url = attr_list['certURL']
@@ -174,6 +177,37 @@ class CertRequestInfo(object):
return cert_request_info
+class CertRequestInfoCollection(object):
+ """
+ Class containing list of CertRequestInfo objects.
+ This data is returned when listing certificate request records in the CA.
+ """
+
+ def __init__(self):
+ self.cert_info_list = []
+ self.links = []
+
+ @classmethod
+ def from_json(cls, json_value):
+ """ Populate object from JSON input """
+ ret = cls()
+ cert_req_infos = json_value['entries']
+ if not isinstance(cert_req_infos, types.ListType):
+ ret.cert_info_list.append(CertRequestInfo.from_json(cert_req_infos))
+ else:
+ for cert_info in cert_req_infos:
+ ret.cert_info_list.append(CertRequestInfo.from_json(cert_info))
+
+ links = json_value['Link']
+ if not isinstance(links, types.ListType):
+ ret.links.append(pki.Link.from_json(links))
+ else:
+ for link in links:
+ ret.links.append(pki.Link.from_json(link))
+
+ return ret
+
+
class CertSearchRequest(object):
"""
An object of this class is used to store the search parameters
@@ -285,6 +319,330 @@ class CertRevokeRequest(object):
setattr(self, "Comments", comments)
+class CertEnrollmentRequest(object):
+ """
+ This class encapsulates the parameters required for a certificate enrollment request.
+ """
+
+ def __init__(self, profile_id=None, renewal=False, serial_number=None, remote_host=None, remote_address=None,
+ inputs=None, outputs=None):
+ """ Constructor """
+ self.profile_id = profile_id
+ self.renewal = renewal
+ self.serial_number = serial_number
+ self.remote_host = remote_host
+ self.remote_address = remote_address
+ if inputs is None:
+ self.inputs = []
+ if outputs is None:
+ self.outputs = []
+
+ @property
+ def profile_id(self):
+ return getattr(self, 'ProfileID', None)
+
+ @profile_id.setter
+ def profile_id(self, value):
+ setattr(self, 'ProfileID', value)
+
+ @property
+ def renewal(self):
+ return getattr(self, 'Renewal', False)
+
+ @renewal.setter
+ def renewal(self, value):
+ setattr(self, 'Renewal', value)
+
+ @property
+ def serial_number(self):
+ return getattr(self, 'SerialNumber', None)
+
+ @serial_number.setter
+ def serial_number(self, value):
+ setattr(self, 'SerialNumber', value)
+
+ @property
+ def remote_host(self):
+ return getattr(self, 'RemoteHost', None)
+
+ @remote_host.setter
+ def remote_host(self, value):
+ setattr(self, 'RemoteHost', value)
+
+ @property
+ def remote_address(self):
+ return getattr(self, 'RemoteAddress', None)
+
+ @remote_address.setter
+ def remote_address(self, value):
+ setattr(self, 'RemoteAddress', value)
+
+ @property
+ def inputs(self):
+ return getattr(self, 'Input')
+
+ @inputs.setter
+ def inputs(self, value):
+ setattr(self, 'Input', value)
+
+ @property
+ def outputs(self):
+ return getattr(self, 'Output')
+
+ @outputs.setter
+ def outputs(self, value):
+ setattr(self, 'Output', value)
+
+ def add_input(self, profile_input):
+ self.inputs.append(profile_input)
+
+ def remove_input(self, profile_input_name):
+ for profile_input in self.inputs:
+ if profile_input_name == profile_input.name:
+ self.inputs.pop(profile_input)
+ break
+
+ def get_input(self, profile_input_name):
+ for profile_input in self.inputs:
+ if profile_input_name == profile_input.name:
+ return profile_input
+
+ return None
+
+ def add_output(self, profile_output):
+ self.outputs.append(profile_output)
+
+ def remove_output(self, profile_output_name):
+ for output in self.outputs:
+ if profile_output_name == output.name:
+ self.outputs.pop(output)
+ break
+
+ def get_output(self, profile_output_name):
+ for output in self.outputs:
+ if profile_output_name == output.name:
+ return output
+
+ return None
+
+ @classmethod
+ def from_json(cls, json_value):
+ enroll_request = cls()
+
+ enroll_request.profile_id = json_value['ProfileID']
+ enroll_request.renewal = json_value['Renewal']
+ if 'SerialNumber' in json_value:
+ enroll_request.serial_number = json_value['SerialNumber']
+ if 'RemoteHost' in json_value:
+ enroll_request.remote_host = json_value['RemoteHost']
+ if 'RemoteAddress' in json_value:
+ enroll_request.remote_address = json_value['RemoteAddress']
+
+ inputs = json_value['Input']
+ if not isinstance(inputs, types.ListType):
+ enroll_request.inputs.append(profile.ProfileInput.from_json(inputs))
+ else:
+ for profile_input in inputs:
+ enroll_request.inputs.append(profile.ProfileInput.from_json(profile_input))
+
+ outputs = json_value['Output']
+ if not isinstance(outputs, types.ListType):
+ enroll_request.outputs.append(profile.ProfileOutput.from_json(outputs))
+ else:
+ for profile_output in outputs:
+ enroll_request.outputs.append(profile.ProfileOutput.from_json(profile_output))
+
+ return enroll_request
+
+
+class CertReviewResponse(CertEnrollmentRequest):
+ """
+ An object of this class represent the response from the server when
+ reviewing a certificate enrollment request.
+ It contains a nonce required to perform action on the request.
+ """
+
+ def __init__(self, profile_id=None, renewal=False, serial_number=None, remote_host=None, remote_address=None,
+ inputs=None, outputs=None, nonce=None, request_id=None, request_type=None, request_status=None,
+ request_owner=None, request_creation_time=None, request_modification_time=None, request_notes=None,
+ profile_approval_by=None, profile_set_id=None, profile_is_visible=None, profile_name=None,
+ profile_description=None, profile_remote_host=None, profile_remote_address=None, policy_sets=None):
+
+ super(CertReviewResponse, self).__init__(profile_id, renewal, serial_number, remote_host,
+ remote_address, inputs, outputs)
+ self.nonce = nonce
+ self.request_id = request_id
+ self.request_type = request_type
+ self.request_status = request_status
+ self.request_owner = request_owner
+ self.request_creation_time = request_creation_time
+ self.request_modification_time = request_modification_time
+ self.request_notes = request_notes
+ self.profile_approved_by = profile_approval_by
+ self.profile_set_id = profile_set_id
+ self.profile_is_visible = profile_is_visible
+ self.profile_name = profile_name
+ self.profile_description = profile_description
+ self.profile_remote_host = profile_remote_host
+ self.profile_remote_address = profile_remote_address
+
+ if policy_sets is None:
+ self.policy_sets = []
+ else:
+ self.policy_sets = policy_sets
+
+ @property
+ def request_id(self):
+ return getattr(self, 'requestId')
+
+ @request_id.setter
+ def request_id(self, value):
+ setattr(self, 'requestId', value)
+
+ @property
+ def request_type(self):
+ return getattr(self, 'requestType')
+
+ @request_type.setter
+ def request_type(self, value):
+ setattr(self, 'requestType', value)
+
+ @property
+ def request_status(self):
+ return getattr(self, 'requestStatus')
+
+ @request_status.setter
+ def request_status(self, value):
+ setattr(self, 'requestStatus', value)
+
+ @property
+ def request_owner(self):
+ return getattr(self, 'requestOwner')
+
+ @request_owner.setter
+ def request_owner(self, value):
+ setattr(self, 'requestOwner', value)
+
+ @property
+ def request_creation_time(self):
+ return getattr(self, 'requestCreationTime')
+
+ @request_creation_time.setter
+ def request_creation_time(self, value):
+ setattr(self, 'requestCreationTime', value)
+
+ @property
+ def request_modification_time(self):
+ return getattr(self, 'requestModificationTime')
+
+ @request_modification_time.setter
+ def request_modification_time(self, value):
+ setattr(self, 'requestModificationTime', value)
+
+ @property
+ def request_notes(self):
+ return getattr(self, 'requestNotes')
+
+ @request_notes.setter
+ def request_notes(self, value):
+ setattr(self, 'requestNotes', value)
+
+ @property
+ def profile_approved_by(self):
+ return getattr(self, 'profileApprovedBy')
+
+ @profile_approved_by.setter
+ def profile_approved_by(self, value):
+ setattr(self, 'profileApprovedBy', value)
+
+ @property
+ def profile_set_id(self):
+ return getattr(self, 'profileSetId')
+
+ @profile_set_id.setter
+ def profile_set_id(self, value):
+ setattr(self, 'profileSetId', value)
+
+ @property
+ def profile_is_visible(self):
+ return getattr(self, 'profileIsVisible')
+
+ @profile_is_visible.setter
+ def profile_is_visible(self, value):
+ setattr(self, 'profileIsVisible', value)
+
+ @property
+ def profile_name(self):
+ return getattr(self, 'profileName')
+
+ @profile_name.setter
+ def profile_name(self, value):
+ setattr(self, 'profileName', value)
+
+ @property
+ def profile_description(self):
+ return getattr(self, 'profileDescription')
+
+ @profile_description.setter
+ def profile_description(self, value):
+ setattr(self, 'profileDescription', value)
+
+ @property
+ def profile_remote_host(self):
+ return getattr(self, 'profileRemoteHost')
+
+ @profile_remote_host.setter
+ def profile_remote_host(self, value):
+ setattr(self, 'profileRemoteHost', value)
+
+ @property
+ def profile_remote_address(self):
+ return getattr(self, 'profileRemoteAddr')
+
+ @profile_remote_address.setter
+ def profile_remote_address(self, value):
+ setattr(self, 'profileRemoteAddr', value)
+
+ @property
+ def policy_sets(self):
+ return getattr(self, 'ProfilePolicySet')
+
+ @policy_sets.setter
+ def policy_sets(self, value):
+ setattr(self, 'ProfilePolicySet', value)
+
+ @classmethod
+ def from_json(cls, json_value):
+
+ #First read the values for attributes defined in CertEnrollmentRequest
+ review_response = super(CertReviewResponse, cls).from_json(json_value)
+
+ review_response.nonce = json_value['nonce']
+ review_response.request_id = json_value['requestId']
+ review_response.request_type = json_value['requestType']
+ review_response.request_status = json_value['requestStatus']
+ review_response.request_owner = json_value['requestOwner']
+ review_response.request_creation_time = json_value['requestCreationTime']
+ review_response.request_modification_time = json_value['requestModificationTime']
+ review_response.request_notes = json_value['requestNotes']
+ review_response.profile_approved_by = json_value['profileApprovedBy']
+ review_response.profile_set_id = json_value['profileSetId']
+ review_response.profile_is_visible = json_value['profileIsVisible']
+ review_response.profile_name = json_value['profileName']
+ review_response.profile_description = json_value['profileDescription']
+ review_response.profile_remote_host = json_value['profileRemoteHost']
+ review_response.profile_remote_address = json_value['profileRemoteAddr']
+
+ profile_policy_sets = json_value['ProfilePolicySet']
+ if not isinstance(profile_policy_sets, types.ListType):
+ review_response.policy_sets.append(profile.ProfilePolicySet.from_json(profile_policy_sets))
+ else:
+ for policy_set in profile_policy_sets:
+ review_response.policy_sets.append(profile.ProfilePolicySet.from_json(policy_set))
+
+ return review_response
+
+
class CertClient(object):
"""
Class encapsulating and mirroring the functionality in the CertResource Java interface class
@@ -298,6 +656,8 @@ class CertClient(object):
'Accept': 'application/json'}
self.cert_url = '/rest/certs'
self.agent_cert_url = '/rest/agent/certs'
+ self.cert_requests_url = '/rest/certrequests'
+ self.agent_cert_requests_url = '/rest/agent/certrequests'
self.enrollment_templates = {}
@pki.handle_exceptions()
@@ -399,10 +759,251 @@ class CertClient(object):
r = self.connection.post(url, None, headers=self.headers)
return CertRequestInfo.from_json(r.json())
+ @pki.handle_exceptions()
+ def get_request(self, request_id):
+ """
+ Get information of a certificate request with the given request ID.
+ Returns a CertRequestInfo object.
+ """
+
+ if request_id is None:
+ raise ValueError("Request ID must be specified")
+ url = self.cert_requests_url + '/' + str(request_id)
+ r = self.connection.get(url, headers=self.headers)
+
+ return CertRequestInfo.from_json(r.json())
+
+ @pki.handle_exceptions()
+ def list_requests(self, request_status=None, request_type=None, from_request_id=None, size=None,
+ max_results=None, max_time=None):
+ """
+ Query for a list of certificates using the arguments passed.
+ Returns a CertRequestInfoCollection object.
+ """
+
+ query_params = {
+ 'requestStatus': request_status,
+ 'requestType': request_type,
+ 'start': from_request_id,
+ 'pageSize': size,
+ 'maxResults': max_results,
+ 'maxTime': max_time
+ }
+ r = self.connection.get(self.agent_cert_requests_url, self.headers, query_params)
+ return CertRequestInfoCollection.from_json(r.json())
+
+ @pki.handle_exceptions()
+ def review_request(self, request_id):
+ """
+ Reviews a certificate enrollment request.
+ Returns a CertReviewResponse object which contains the nonce
+ from the server needed to perform an action on the request.
+ """
+ if request_id is None:
+ raise ValueError("Request Id must be specified.")
+
+ url = self.agent_cert_requests_url + '/' + str(request_id)
+ r = self.connection.get(url, headers=self.headers)
+ return CertReviewResponse.from_json(r.json())
+
+ @pki.handle_exceptions()
+ def _perform_action(self, request_id, cert_review_response, action):
+ """
+ An internal method used by all the action methods to perform
+ an action on a certificate request.
+ The parameter cert_review_response
+ """
+ if request_id is None:
+ raise ValueError("Request Id must be specified.")
+ if cert_review_response is None:
+ cert_review_response = self.review_request(request_id)
+
+ url = self.agent_cert_requests_url + '/' + request_id + '/' + action
+ review_response = json.dumps(cert_review_response, cls=encoder.CustomTypeEncoder, sort_keys=True)
+ r = self.connection.post(url, review_response, headers=self.headers)
+ return r
+
+ def approve_request(self, request_id, cert_review_response=None):
+ """
+ Approves a certificate enrollment request.
+ If cert_review_response is None, a review request operation is performed to fetch the
+ CertReviewResponse object.
+ Requires as agent level authentication.
+ """
+ return self._perform_action(request_id, cert_review_response, 'approve')
+
+ def cancel_request(self, request_id, cert_review_response=None):
+ """
+ Cancels a certificate enrollment request.
+ If cert_review_response is None, a review request operation is performed to fetch the
+ CertReviewResponse object.
+ Requires as agent level authentication.
+ """
+ return self._perform_action(request_id, cert_review_response, 'cancel')
+
+ def reject_request(self, request_id, cert_review_response=None):
+ """
+ Rejects a certificate enrollment request.
+ If cert_review_response is None, a review request operation is performed to fetch the
+ CertReviewResponse object.
+ Requires as agent level authentication.
+ """
+ return self._perform_action(request_id, cert_review_response, 'reject')
+
+ def validate_request(self, request_id, cert_review_response):
+ """
+ Validates a certificate enrollment request.
+ If cert_review_response is None, a review request operation is performed to fetch the
+ CertReviewResponse object.
+ Requires as agent level authentication.
+ """
+ return self._perform_action(request_id, cert_review_response, 'validate')
+
+ def update_request(self, request_id, cert_review_response):
+ """
+ Updates a certificate enrollment request.
+ If cert_review_response is None, a review request operation is performed to fetch the
+ CertReviewResponse object.
+ Requires as agent level authentication.
+ """
+ return self._perform_action(request_id, cert_review_response, 'update')
+
+ def assign_request(self, request_id, cert_review_response):
+ """
+ Assigns a certificate enrollment request.
+ If cert_review_response is None, a review request operation is performed to fetch the
+ CertReviewResponse object.
+ Requires as agent level authentication.
+ """
+ return self._perform_action(request_id, cert_review_response, 'assign')
+
+ def unassign_request(self, request_id, cert_review_response):
+ """
+ Un-assigns a certificate enrollment request.
+ If cert_review_response is None, a review request operation is performed to fetch the
+ CertReviewResponse object.
+ Requires as agent level authentication.
+ """
+ return self._perform_action(request_id, cert_review_response, 'unassign')
+
+ @pki.handle_exceptions()
+ def list_enrollment_templates(self, start=None, size=None):
+ """
+ Gets the list of profile templates supported by the CA.
+ The values for start and size arguments determine the starting point and the length of the list.
+ Returns a ProfileDataInfoCollection object.
+ """
+
+ url = self.cert_requests_url + '/profiles'
+ query_params = {
+ 'start': start,
+ 'size': size
+ }
+ r = self.connection.get(url, self.headers, query_params)
+ print r
+ return profile.ProfileDataInfoCollection.from_json(r.json())
+
+ @pki.handle_exceptions()
+ def get_enrollment_template(self, profile_id):
+ """
+ Fetch the enrollment template for the given profile id.
+ For the first time, the request is sent to the server.
+ The retrieved CertEnrollmentRequest object is then cached locally for future requests.
+ Returns a CerEnrollmentRequest object.
+ """
+
+ if profile_id in self.enrollment_templates:
+ return copy.deepcopy(self.enrollment_templates[profile_id])
+ url = self.cert_requests_url + '/profiles/' + str(profile_id)
+ r = self.connection.get(url, self.headers)
+
+ #Caching the enrollment template object in-memory for future use.
+ enrollment_template = CertEnrollmentRequest.from_json(r.json())
+ self.enrollment_templates[profile_id] = enrollment_template
+
+ return copy.deepcopy(enrollment_template)
+
+ @pki.handle_exceptions()
+ def create_enrollment_request(self, profile_id, inputs):
+ """
+ Fetches the enrollment request object for the given profile and
+ sets values to its attributes using the values provided in the inputs dictionary.
+ Returns the CertEnrollmentRequest object, which can be submitted to enroll a certificate.
+ """
+ if inputs is None or len(inputs) == 0:
+ raise ValueError("No inputs provided.")
+
+ enrollment_template = self.get_enrollment_template(profile_id)
+ for profile_input in enrollment_template.inputs:
+ for attribute in profile_input.attributes:
+ if attribute.name in inputs:
+ attribute.value = inputs[attribute.name]
+
+ return enrollment_template
+
+ @pki.handle_exceptions()
+ def submit_enrollment_request(self, enrollment_request):
+ """
+ Submits the CertEnrollmentRequest object to the server.
+ Returns a CertRequestInfoCollection object with information about the certificate requests
+ enrolled at the CA.
+ """
+ request_object = json.dumps(enrollment_request, cls=encoder.CustomTypeEncoder, sort_keys=True)
+ r = self.connection.post(self.cert_requests_url, request_object, self.headers)
+ return CertRequestInfoCollection.from_json(r.json())
+
+ @pki.handle_exceptions()
+ def enroll_cert(self, profile_id, inputs):
+ """
+ A convenience method for enrolling a certificate for a given profile id.
+ The inputs parameter should be a dictionary with values for the profile attributes
+ for an enrollment request.
+
+ Calling this method with valid arguments, creates an enrollment request, submits it
+ to the server, approves the certificate requests generated for the enrollment and
+ returns a list of CertData objects for all the certificates generated as part of this
+ enrollment.
+
+ Note: This method supports only certificate enrollment where only one agent approval
+ is sufficient.
+
+ Requires an agent level authentication.
+ """
+
+ # Create a CertEnrollmentRequest object using the inputs for the given profile id.
+ enroll_request = self.create_enrollment_request(profile_id, inputs)
+
+ # Submit the enrollment request
+ cert_request_infos = self.submit_enrollment_request(enroll_request)
+
+ # Approve the requests generated for the certificate enrollment.
+ # Fetch the CertData objects for all the certificates created and return to the caller.
+
+ certificates = []
+ for cert_request_info in cert_request_infos.cert_info_list:
+ request_id = cert_request_info.request_id
+ self.approve_request(request_id)
+ cert_id = self.get_request(request_id).cert_id
+ certificates.append(self.get_cert(cert_id))
+
+ return certificates
+
encoder.NOTYPES['CertData'] = CertData
encoder.NOTYPES['CertSearchRequest'] = CertSearchRequest
encoder.NOTYPES['CertRevokeRequest'] = CertRevokeRequest
+encoder.NOTYPES['CertEnrollmentRequest'] = CertEnrollmentRequest
+encoder.NOTYPES['ProfileInput'] = profile.ProfileInput
+encoder.NOTYPES['ProfileAttribute'] = profile.ProfileAttribute
+encoder.NOTYPES['Descriptor'] = profile.Descriptor
+encoder.NOTYPES['ProfileOutput'] = profile.ProfileOutput
+encoder.NOTYPES['CertReviewResponse'] = CertReviewResponse
+encoder.NOTYPES['ProfilePolicySet'] = profile.ProfilePolicySet
+encoder.NOTYPES['ProfilePolicy'] = profile.ProfilePolicy
+encoder.NOTYPES['PolicyDefault'] = profile.PolicyDefault
+encoder.NOTYPES['PolicyConstraint'] = profile.PolicyConstraint
+encoder.NOTYPES['PolicyConstraintValue'] = profile.PolicyConstraintValue
+encoder.NOTYPES['ProfileParameter'] = profile.ProfileParameter
def main():
@@ -416,16 +1017,76 @@ def main():
#Instantiate the CertClient
cert_client = CertClient(connection)
+ cert_client.get_enrollment_template('caUserCert')
+
+ #Enrolling an user certificate
+ print('Enrolling an user certificate')
+ print('-----------------------------')
+
+ inputs = dict()
+ inputs['cert_request_type'] = 'crmf'
+ inputs['cert_request'] = "MIIBpDCCAaAwggEGAgUA5n9VYTCBx4ABAqUOMAwxCjAIBgNVBAMTAXimgZ8wDQYJKoZIhvcNAQEBBQAD" \
+ "gY0AMIGJAoGBAK/SmUVoUjBtqHNw/e3OoCSXw42pdQSR53/eYJWpf7nyTbZ9UuIhGfXOtxy5vRetmDHE" \
+ "9u0AopmuJbr1rL17/tSnDakpkE9umQ2lMOReLloSdX32w2xOeulUwh5BGbFpq10S0SvW1H93Vn0eCy2a" \
+ "a4UtILNEsp7JJ3FnYJibfuMPAgMBAAGpEDAOBgNVHQ8BAf8EBAMCBeAwMzAVBgkrBgEFBQcFAQEMCHJl" \
+ "Z1Rva2VuMBoGCSsGAQUFBwUBAgwNYXV0aGVudGljYXRvcqGBkzANBgkqhkiG9w0BAQUFAAOBgQCuywnr" \
+ "Dk/wGwfbguw9oVs9gzFQwM4zeFbk+z82G5CWoG/4mVOT5LPL5Q8iF+KfnaU9Qcu6zZPxW6ZmDd8WpPJ+" \
+ "MTPyQl3Q5BfiKa4l5ra1NeqxMOlMiiupwINmm7jd1KaA2eIjuyC8/gTaO4b14R6aRaOj+Scp9cNYbthA7REhJw=="
+ inputs['sn_uid'] = 'test12345'
+ inputs['sn_e'] = 'example@redhat.com'
+ inputs['sn_cn'] = 'TestUser'
+
+ cert_data_infos = cert_client.enroll_cert('caUserCert', inputs)
+
+ for data in cert_data_infos:
+ print('Serial Number: ' + data.serial_number)
+ print('Issuer: ' + data.issuer_dn)
+ print('Subject: ' + data.subject_dn)
+ print('Pretty Print:')
+ print(data.pretty_repr)
+
+ print
+
+ # Enrolling a server certificate
+ print("Enrolling a server certificate")
+ print('------------------------------')
+
+ inputs = dict()
+ inputs['cert_request_type'] = 'pkcs10'
+ inputs['cert_request'] = "MIIBmDCCAQECAQAwWDELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAk5DMRAwDgYDVQQHDAdSYWxlaWdoMRUwE" \
+ "wYDVQQKDAxSZWQgSGF0IEluYy4xEzARBgNVBAMMClRlc3RTZXJ2ZXIwgZ8wDQYJKoZIhvcNAQEBBQADgY" \
+ "0AMIGJAoGBAMJpWz92dSYCvWxllrQCY5atPKCswUwyppRNGPnKmJ77AdHBBI4dFyET+h/+69jQMTLZMa8" \
+ "FX7SbyHvgbgLBP4Q/RzCSE2S87qFNjriOqiQCqJmcrzDzdncJQiP+O7T6MSpLo3smLP7dK1Vd7vK0Vy8y" \
+ "HwV0eBx7DgYedv2slBPHAgMBAAGgADANBgkqhkiG9w0BAQUFAAOBgQBvkxAGKwkfK3TKwLc5Mg0IWp8zG" \
+ "RVwxdIlghAL8DugNocCNNgmZazglJOOehLuk0/NkLX1ZM5RrVgM09W6kcfWZtIwr5Uje2K/+6tW2ZTGrb" \
+ "izs7CNOTMzA/9H8CkHb4H9P/qRT275zHIocYj4smUnXLwWGsBMeGs+OMMbGvSrHg=="
+
+ inputs['requestor_name'] = 'Tester'
+ inputs['requestor_email'] = 'example@redhat.com'
+
+ cert_data_infos_2 = cert_client.enroll_cert('caServerCert', inputs)
+ for data in cert_data_infos_2:
+ print('Serial Number: ' + data.serial_number)
+ print('Issuer: ' + data.issuer_dn)
+ print('Subject: ' + data.subject_dn)
+ print('Pretty Print:')
+ print(data.pretty_repr)
+
+ print
+
# List all the VALID certs
+ print('An example listing all VALID certs')
+ print('----------------------------------')
+
search_params = {'status': 'VALID'}
- cert_data_infos = cert_client.list_certs(**search_params)
- for cert_data_info in cert_data_infos.cert_info_list:
+ cert_data_list = cert_client.list_certs(**search_params)
+ for cert_data_info in cert_data_list.cert_info_list:
print("Serial Number: " + cert_data_info.cert_id)
print("Subject DN: " + cert_data_info.subject_dn)
print("Status: " + cert_data_info.status)
print
- #Trying an invalid get cert
+ #Trying to get a non-existing cert
#Assuming that there is no certificate with serial number = 100
try:
cert_data = cert_client.get_cert(100)
@@ -437,10 +1098,14 @@ def main():
print
# Certificate Serial Number used for CertClient methods.
- # 7 and '0x7' are also valid values
- cert_id = 0x7
+ # 7, 0x7 and '0x7' are also valid values
+ # Following examples use the serial number of the user certificate enrolled before.
+ cert_id = cert_data_infos[0].serial_number
#Get certificate data
+ print('Getting information of a certificate')
+ print('------------------------------------')
+
cert_data = cert_client.get_cert(cert_id)
# Print the certificate information
print('Serial Number: ' + cert_data.serial_number)
@@ -456,6 +1121,9 @@ def main():
print
# Review a certificate - used to get a nonce for revoke request.
+ print('Reviewing a certificate')
+ print('-----------------------')
+
cert_data = cert_client.review_cert(cert_id)
print('Serial Number: ' + cert_data.serial_number)
print('Issuer: ' + cert_data.issuer_dn)
@@ -465,6 +1133,9 @@ def main():
print
#Revoke a certificate
+ print('Revoking a certificate')
+ print('----------------------')
+
cert_request_info = cert_client.revoke_cert(cert_data.serial_number,
revocation_reason=CertRevokeRequest.REASON_CERTIFICATE_HOLD,
comments="Test revoking a cert", nonce=cert_data.nonce)
@@ -474,6 +1145,9 @@ def main():
print
#Un-revoke a certificate
+ print('Un-revoking a certificate')
+ print('-------------------------')
+
cert_request_info = cert_client.unrevoke_cert(cert_data.serial_number)
print('Request ID: ' + cert_request_info.request_id)
print('Request Type: ' + cert_request_info.request_type)
@@ -482,4 +1156,4 @@ def main():
if __name__ == "__main__":
- main()
+ main() \ No newline at end of file
diff --git a/base/common/python/pki/profile.py b/base/common/python/pki/profile.py
new file mode 100644
index 000000000..490db0994
--- /dev/null
+++ b/base/common/python/pki/profile.py
@@ -0,0 +1,577 @@
+#!/usr/bin/python
+"""
+Created on May 13,, 2014
+
+@author: akoneru
+"""
+
+import types
+
+import pki
+
+
+class ProfileDataInfo(object):
+ """Stores information about a profile"""
+ def __init__(self):
+ self.profile_id = None
+ self.profile_name = None
+ self.profile_description = None
+ self.profile_url = None
+
+ @classmethod
+ def from_json(cls, attr_list):
+ profile_data_info = cls()
+ profile_data_info.profile_id = attr_list['profileId']
+ profile_data_info.profile_name = attr_list['profileName']
+ profile_data_info.profile_description = attr_list['profileDescription']
+ profile_data_info.profile_url = attr_list['profileURL']
+
+ return profile_data_info
+
+
+class ProfileDataInfoCollection(object):
+ """
+ Represents a collection of ProfileDataInfo objects.
+ Also encapsulates the links for the list of the objects stored.
+ """
+
+ def __init__(self):
+ self.profile_data_list = []
+ self.links = []
+
+ @classmethod
+ def from_json(cls, json_value):
+ ret = cls()
+ profile_data_infos = json_value['entries']
+ if not isinstance(profile_data_infos, types.ListType):
+ ret.profile_data_list.append(ProfileDataInfo.from_json(profile_data_infos))
+ else:
+ for profile_info in profile_data_infos:
+ ret.profile_data_list.append(ProfileDataInfo.from_json(profile_info))
+
+ links = json_value['Link']
+ if not isinstance(links, types.ListType):
+ ret.links.append(pki.Link.from_json(links))
+ else:
+ for link in links:
+ ret.links.append(pki.Link.from_json(link))
+
+ return ret
+
+
+class Descriptor(object):
+ """
+ This class represents the description of a ProfileAttribute.
+ It stores information such as the syntax, constraint and default value of a profile attribute.
+ """
+
+ def __init__(self, syntax=None, constraint=None, description=None, default_value=None):
+ self.syntax = syntax
+ self.constraint = constraint
+ self.description = description
+ self.default_value = default_value
+
+ @property
+ def syntax(self):
+ return getattr(self, 'Syntax', None)
+
+ @syntax.setter
+ def syntax(self, value):
+ setattr(self, 'Syntax', value)
+
+ @property
+ def constraint(self):
+ return getattr(self, 'Constraint', None)
+
+ @constraint.setter
+ def constraint(self, value):
+ setattr(self, 'Constraint', value)
+
+ @property
+ def description(self):
+ return getattr(self, 'Description', None)
+
+ @description.setter
+ def description(self, value):
+ setattr(self, 'Description', value)
+
+ @property
+ def default_value(self):
+ return getattr(self, 'DefaultValue', None)
+
+ @default_value.setter
+ def default_value(self, value):
+ setattr(self, 'DefaultValue', value)
+
+ @classmethod
+ def from_json(cls, attr_list):
+ descriptor = cls()
+ for attr in attr_list:
+ setattr(descriptor, attr, attr_list[attr])
+
+ return descriptor
+
+
+class ProfileAttribute(object):
+ """
+ Represents a profile attribute of a ProfileInput.
+ """
+ def __init__(self, name=None, value=None, descriptor=None):
+ self.name = name
+ self.value = value
+ self.descriptor = descriptor
+
+ @property
+ def descriptor(self):
+ return getattr(self, 'Descriptor')
+
+ @descriptor.setter
+ def descriptor(self, value):
+ setattr(self, 'Descriptor', value)
+
+ @property
+ def value(self):
+ return getattr(self, 'Value')
+
+ @value.setter
+ def value(self, value):
+ setattr(self, 'Value', value)
+
+ @classmethod
+ def from_json(cls, attr_list):
+ attribute = cls()
+ attribute.name = attr_list['name']
+ if 'Value' in attr_list:
+ attribute.value = attr_list['Value']
+ if 'Descriptor' in attr_list:
+ attribute.descriptor = Descriptor.from_json(attr_list['Descriptor'])
+
+ return attribute
+
+
+class ProfileInput(object):
+ """
+ This class encapsulates all the attributes of a profile to generate a
+ specific property of a certificate.
+ Ex. Subject name, Requestor Information etc.
+ """
+
+ def __init__(self, profile_input_id=None, class_id=None, name=None, text=None, attributes=None,
+ config_attributes=None):
+
+ self.profile_input_id = profile_input_id
+ self.class_id = class_id
+ self.name = name
+ self.text = text
+ if attributes is None:
+ self.attributes = []
+ if config_attributes is None:
+ self.config_attributes = []
+
+ @property
+ def profile_input_id(self):
+ return getattr(self, 'id')
+
+ @profile_input_id.setter
+ def profile_input_id(self, value):
+ setattr(self, 'id', value)
+
+ @property
+ def class_id(self):
+ return getattr(self, 'ClassID', None)
+
+ @class_id.setter
+ def class_id(self, value):
+ setattr(self, 'ClassID', value)
+
+ @property
+ def name(self):
+ return getattr(self, 'Name', None)
+
+ @name.setter
+ def name(self, value):
+ setattr(self, 'Name', value)
+
+ @property
+ def text(self):
+ return getattr(self, 'Text', None)
+
+ @text.setter
+ def text(self, value):
+ setattr(self, 'Text', value)
+
+ @property
+ def attributes(self):
+ return getattr(self, 'Attribute')
+
+ @attributes.setter
+ def attributes(self, value):
+ setattr(self, 'Attribute', value)
+
+ @property
+ def config_attributes(self):
+ return getattr(self, 'ConfigAttribute')
+
+ @config_attributes.setter
+ def config_attributes(self, value):
+ setattr(self, 'ConfigAttribute', value)
+
+ def add_attribute(self, profile_attribute):
+ self.attributes.append(profile_attribute)
+
+ def remove_attribute(self, profile_attribute_name):
+ for attr in self.attributes:
+ if attr.name == profile_attribute_name:
+ self.attributes.remove(attr)
+ break
+
+ def get_attribute(self, profile_attribute_name):
+ for attr in self.attributes:
+ if attr.name == profile_attribute_name:
+ return attr
+
+ return None
+
+ def add_config_attribute(self, profile_attribute):
+ self.attributes.append(profile_attribute)
+
+ def remove_config_attribute(self, config_attribute_name):
+ for attr in self.config_attributes:
+ if attr.name == config_attribute_name:
+ self.attributes.remove(attr)
+ break
+
+ def get_config_attribute(self, config_attribute_name):
+ for attr in self.attributes:
+ if attr.name == config_attribute_name:
+ return attr
+
+ return None
+
+ @classmethod
+ def from_json(cls, json_value):
+ profile_input = cls()
+ profile_input.profile_input_id = json_value['id']
+ profile_input.class_id = json_value['ClassID']
+ profile_input.name = json_value['Name']
+ if 'Text' in json_value:
+ profile_input.text = json_value['Text']
+
+ attributes = json_value['Attribute']
+ if not isinstance(attributes, types.ListType):
+ profile_input.attributes.append(ProfileAttribute.from_json(attributes))
+ else:
+ for profile_info in attributes:
+ profile_input.attributes.append(ProfileAttribute.from_json(profile_info))
+
+ config_attributes = json_value['ConfigAttribute']
+ if not isinstance(config_attributes, types.ListType):
+ profile_input.config_attributes.append(ProfileAttribute.from_json(config_attributes))
+ else:
+ for config_attribute in config_attributes:
+ profile_input.config_attributes.append(ProfileAttribute.from_json(config_attribute))
+
+ return profile_input
+
+
+class ProfileOutput(object):
+ """
+ This class defines the output of a certificate enrollment request
+ using a profile.
+ """
+
+ def __init__(self, profile_output_id=None, name=None, text=None, class_id=None, attributes=None):
+ self.profile_output_id = profile_output_id
+ self.name = name
+ self.text = text
+ self.class_id = class_id
+ if attributes is None:
+ self.attributes = []
+
+ @property
+ def profile_output_id(self):
+ return getattr(self, 'id')
+
+ @profile_output_id.setter
+ def profile_output_id(self, value):
+ setattr(self, 'id', value)
+
+ @property
+ def class_id(self):
+ return getattr(self, 'classId', None)
+
+ @class_id.setter
+ def class_id(self, value):
+ setattr(self, 'classId', value)
+
+ def add_attribute(self, profile_attribute):
+ self.attributes.append(profile_attribute)
+
+ def remove_attribute(self, profile_attribute_name):
+ for attr in self.attributes:
+ if attr.name == profile_attribute_name:
+ self.attributes.remove(attr)
+ break
+
+ def get_attribute(self, profile_attribute_name):
+ for attr in self.attributes:
+ if attr.name == profile_attribute_name:
+ return attr
+
+ return None
+
+ @classmethod
+ def from_json(cls, json_value):
+ profile_output = cls()
+ profile_output.profile_output_id = json_value['id']
+ profile_output.name = json_value['name']
+ profile_output.text = json_value['text']
+ profile_output.class_id = json_value['classId']
+ attributes = json_value['attributes']
+ if not isinstance(attributes, types.ListType):
+ profile_output.attributes.append(ProfileAttribute.from_json(attributes))
+ else:
+ for profile_info in attributes:
+ profile_output.attributes.append(ProfileAttribute.from_json(profile_info))
+ return profile_output
+
+
+class ProfileParameter(object):
+
+ def __init__(self, name=None, value=None):
+ self.name = name
+ self.value = value
+
+ @classmethod
+ def from_json(cls, attr_list):
+ param = cls()
+ for attr in attr_list:
+ setattr(param, attr, attr_list[attr])
+ return param
+
+
+class PolicyDefault(object):
+ """
+ An object of this class contains information of the default usage of a specific ProfileInput.
+ """
+
+ def __init__(self, name=None, class_id=None, description=None, policy_attributes=None, policy_params=None):
+ self.name = name
+ self.class_id = class_id
+ self.description = description
+ if policy_attributes is None:
+ self.policy_attributes = []
+ else:
+ self.policy_attributes = policy_attributes
+ if policy_params is None:
+ self.policy_params = []
+ else:
+ self.policy_params = policy_params
+
+ @property
+ def name(self):
+ return getattr(self, 'id')
+
+ @name.setter
+ def name(self, value):
+ setattr(self, 'id', value)
+
+ @property
+ def class_id(self):
+ return getattr(self, 'classId')
+
+ @class_id.setter
+ def class_id(self, value):
+ setattr(self, 'classId', value)
+
+ @property
+ def policy_attributes(self):
+ return getattr(self, 'policyAttribute')
+
+ @policy_attributes.setter
+ def policy_attributes(self, value):
+ setattr(self, 'policyAttribute', value)
+
+ @property
+ def policy_params(self):
+ return getattr(self, 'params')
+
+ @policy_params.setter
+ def policy_params(self, value):
+ setattr(self, 'params', value)
+
+ @classmethod
+ def from_json(cls, json_value):
+ policy_def = cls()
+ if 'id' in json_value:
+ policy_def.name = json_value['id']
+ if 'classId' in json_value:
+ policy_def.class_id = json_value['classId']
+ if 'description' in json_value:
+ policy_def.description = json_value['description']
+ if 'policyAttribute' in json_value:
+ attributes = json_value['policyAttribute']
+ if not isinstance(attributes, types.ListType):
+ policy_def.policy_attributes.append(ProfileAttribute.from_json(attributes))
+ else:
+ for attr in attributes:
+ policy_def.policy_attributes.append(ProfileAttribute.from_json(attr))
+
+ if 'params' in json_value:
+ params = json_value['params']
+ if not isinstance(params, types.ListType):
+ policy_def.policy_params.append(ProfileParameter.from_json(params))
+ else:
+ for param in params:
+ policy_def.policy_params.append(ProfileParameter.from_json(param))
+
+ return policy_def
+
+
+class PolicyConstraintValue(object):
+
+ def __init__(self, name=None, value=None, descriptor=None):
+ self.name = name
+ self.value = value
+ self.descriptor = descriptor
+
+ @property
+ def name(self):
+ return getattr(self, 'id')
+
+ @name.setter
+ def name(self, value):
+ setattr(self, 'id', value)
+
+ @classmethod
+ def from_json(cls, json_value):
+ ret = cls()
+
+ ret.name = json_value['id']
+ ret.value = json_value['value']
+ if 'descriptor' in json_value:
+ ret.descriptor = Descriptor.from_json(json_value['descriptor'])
+
+ return ret
+
+
+class PolicyConstraint(object):
+ """
+ An object of this class contains the policy constraints applied to a ProfileInput
+ used by a certificate enrollment request.
+ """
+
+ def __init__(self, name=None, description=None, class_id=None, policy_constraint_values=None):
+ self.name = name
+ self.description = description
+ self.class_id = class_id
+ if policy_constraint_values is None:
+ self.policy_constraint_values = []
+ else:
+ self.policy_constraint_values = policy_constraint_values
+
+ @property
+ def name(self):
+ return getattr(self, 'id')
+
+ @name.setter
+ def name(self, value):
+ setattr(self, 'id', value)
+
+ @property
+ def class_id(self):
+ return getattr(self, 'classId')
+
+ @class_id.setter
+ def class_id(self, value):
+ setattr(self, 'classId', value)
+
+ @property
+ def policy_constraint_values(self):
+ return getattr(self, 'constraint')
+
+ @policy_constraint_values.setter
+ def policy_constraint_values(self, value):
+ setattr(self, 'constraint', value)
+
+ @classmethod
+ def from_json(cls, json_value):
+ policy_constraint = cls()
+ if 'id' in json_value:
+ policy_constraint.name = json_value['id']
+ if 'description' in json_value:
+ policy_constraint.description = json_value['description']
+ if 'classId' in json_value:
+ policy_constraint.class_id = json_value['classId']
+ if 'constraint' in json_value:
+ constraints = json_value['constraint']
+ if not isinstance(constraints, types.ListType):
+ policy_constraint.policy_constraint_values.append(PolicyConstraintValue.from_json(constraints))
+ else:
+ for constraint in constraints:
+ policy_constraint.policy_constraint_values.append(PolicyConstraintValue.from_json(constraint))
+
+ return policy_constraint
+
+
+class ProfilePolicy(object):
+ """
+ This class represents the policy a profile adheres to.
+ An object of this class stores the default values for profile and the constraints present on the
+ values of the attributes of the profile submitted for an enrollment request.
+ """
+
+ def __init__(self, policy_id=None, policy_default=None, policy_constraint=None):
+ self.policy_id = policy_id
+ self.policy_default = policy_default
+ self.policy_constraint = policy_constraint
+
+ @property
+ def policy_id(self):
+ return getattr(self, 'id')
+
+ @policy_id.setter
+ def policy_id(self, value):
+ setattr(self, 'id', value)
+
+ @property
+ def policy_default(self):
+ return getattr(self, 'def')
+
+ @policy_default.setter
+ def policy_default(self, value):
+ setattr(self, 'def', value)
+
+ @property
+ def policy_constraint(self):
+ return getattr(self, 'constraint')
+
+ @policy_constraint.setter
+ def policy_constraint(self, value):
+ setattr(self, 'constraint', value)
+
+ @classmethod
+ def from_json(cls, json_value):
+ return cls(json_value['id'], PolicyDefault.from_json(json_value['def']),
+ PolicyConstraint.from_json(json_value['constraint']))
+
+
+class ProfilePolicySet(object):
+ """
+ Stores a list of ProfilePolicy objects.
+ """
+ def __init__(self):
+ self.policies = []
+
+ @classmethod
+ def from_json(cls, attr_list):
+ policy_set = cls()
+
+ policies = attr_list['policies']
+ if not isinstance(policies, types.ListType):
+ policy_set.policies.append(ProfilePolicy.from_json(policies))
+ else:
+ for policy in policies:
+ policy_set.policies.append(ProfilePolicy.from_json(policy))
+
+ return policy_set