diff options
-rw-r--r-- | base/common/python/pki/cert.py | 694 | ||||
-rw-r--r-- | base/common/python/pki/profile.py | 577 |
2 files changed, 1261 insertions, 10 deletions
diff --git a/base/common/python/pki/cert.py b/base/common/python/pki/cert.py index c01410485..b22307ad1 100644 --- a/base/common/python/pki/cert.py +++ b/base/common/python/pki/cert.py @@ -4,11 +4,14 @@ Created on Feb 13, 2014 @author: akoneru """ +import copy import json +import types + import pki import pki.client as client import pki.encoder as encoder -import types +import pki.profile as profile class CertId(object): @@ -104,8 +107,8 @@ class CertDataInfo(object): class CertDataInfoCollection(object): """ - Class containing lists of CertDataInfo objects. - This data is returned when searching/listing certificate records on the CA. + Class containing list of CertDataInfo objects and their respective link objects. + This data is returned when searching/listing certificate records in the CA. """ def __init__(self): @@ -162,7 +165,7 @@ class CertRequestInfo(object): cert_request_info.request_id = \ str(cert_request_info.request_url)[(str(cert_request_info.request_url).rfind("/") + 1):] #Optional parameters - if 'certID' in attr_list: + if 'certId' in attr_list: cert_request_info.cert_id = attr_list['certId'] if 'certURL' in attr_list: cert_request_info.cert_url = attr_list['certURL'] @@ -174,6 +177,37 @@ class CertRequestInfo(object): return cert_request_info +class CertRequestInfoCollection(object): + """ + Class containing list of CertRequestInfo objects. + This data is returned when listing certificate request records in the CA. + """ + + def __init__(self): + self.cert_info_list = [] + self.links = [] + + @classmethod + def from_json(cls, json_value): + """ Populate object from JSON input """ + ret = cls() + cert_req_infos = json_value['entries'] + if not isinstance(cert_req_infos, types.ListType): + ret.cert_info_list.append(CertRequestInfo.from_json(cert_req_infos)) + else: + for cert_info in cert_req_infos: + ret.cert_info_list.append(CertRequestInfo.from_json(cert_info)) + + links = json_value['Link'] + if not isinstance(links, types.ListType): + ret.links.append(pki.Link.from_json(links)) + else: + for link in links: + ret.links.append(pki.Link.from_json(link)) + + return ret + + class CertSearchRequest(object): """ An object of this class is used to store the search parameters @@ -285,6 +319,330 @@ class CertRevokeRequest(object): setattr(self, "Comments", comments) +class CertEnrollmentRequest(object): + """ + This class encapsulates the parameters required for a certificate enrollment request. + """ + + def __init__(self, profile_id=None, renewal=False, serial_number=None, remote_host=None, remote_address=None, + inputs=None, outputs=None): + """ Constructor """ + self.profile_id = profile_id + self.renewal = renewal + self.serial_number = serial_number + self.remote_host = remote_host + self.remote_address = remote_address + if inputs is None: + self.inputs = [] + if outputs is None: + self.outputs = [] + + @property + def profile_id(self): + return getattr(self, 'ProfileID', None) + + @profile_id.setter + def profile_id(self, value): + setattr(self, 'ProfileID', value) + + @property + def renewal(self): + return getattr(self, 'Renewal', False) + + @renewal.setter + def renewal(self, value): + setattr(self, 'Renewal', value) + + @property + def serial_number(self): + return getattr(self, 'SerialNumber', None) + + @serial_number.setter + def serial_number(self, value): + setattr(self, 'SerialNumber', value) + + @property + def remote_host(self): + return getattr(self, 'RemoteHost', None) + + @remote_host.setter + def remote_host(self, value): + setattr(self, 'RemoteHost', value) + + @property + def remote_address(self): + return getattr(self, 'RemoteAddress', None) + + @remote_address.setter + def remote_address(self, value): + setattr(self, 'RemoteAddress', value) + + @property + def inputs(self): + return getattr(self, 'Input') + + @inputs.setter + def inputs(self, value): + setattr(self, 'Input', value) + + @property + def outputs(self): + return getattr(self, 'Output') + + @outputs.setter + def outputs(self, value): + setattr(self, 'Output', value) + + def add_input(self, profile_input): + self.inputs.append(profile_input) + + def remove_input(self, profile_input_name): + for profile_input in self.inputs: + if profile_input_name == profile_input.name: + self.inputs.pop(profile_input) + break + + def get_input(self, profile_input_name): + for profile_input in self.inputs: + if profile_input_name == profile_input.name: + return profile_input + + return None + + def add_output(self, profile_output): + self.outputs.append(profile_output) + + def remove_output(self, profile_output_name): + for output in self.outputs: + if profile_output_name == output.name: + self.outputs.pop(output) + break + + def get_output(self, profile_output_name): + for output in self.outputs: + if profile_output_name == output.name: + return output + + return None + + @classmethod + def from_json(cls, json_value): + enroll_request = cls() + + enroll_request.profile_id = json_value['ProfileID'] + enroll_request.renewal = json_value['Renewal'] + if 'SerialNumber' in json_value: + enroll_request.serial_number = json_value['SerialNumber'] + if 'RemoteHost' in json_value: + enroll_request.remote_host = json_value['RemoteHost'] + if 'RemoteAddress' in json_value: + enroll_request.remote_address = json_value['RemoteAddress'] + + inputs = json_value['Input'] + if not isinstance(inputs, types.ListType): + enroll_request.inputs.append(profile.ProfileInput.from_json(inputs)) + else: + for profile_input in inputs: + enroll_request.inputs.append(profile.ProfileInput.from_json(profile_input)) + + outputs = json_value['Output'] + if not isinstance(outputs, types.ListType): + enroll_request.outputs.append(profile.ProfileOutput.from_json(outputs)) + else: + for profile_output in outputs: + enroll_request.outputs.append(profile.ProfileOutput.from_json(profile_output)) + + return enroll_request + + +class CertReviewResponse(CertEnrollmentRequest): + """ + An object of this class represent the response from the server when + reviewing a certificate enrollment request. + It contains a nonce required to perform action on the request. + """ + + def __init__(self, profile_id=None, renewal=False, serial_number=None, remote_host=None, remote_address=None, + inputs=None, outputs=None, nonce=None, request_id=None, request_type=None, request_status=None, + request_owner=None, request_creation_time=None, request_modification_time=None, request_notes=None, + profile_approval_by=None, profile_set_id=None, profile_is_visible=None, profile_name=None, + profile_description=None, profile_remote_host=None, profile_remote_address=None, policy_sets=None): + + super(CertReviewResponse, self).__init__(profile_id, renewal, serial_number, remote_host, + remote_address, inputs, outputs) + self.nonce = nonce + self.request_id = request_id + self.request_type = request_type + self.request_status = request_status + self.request_owner = request_owner + self.request_creation_time = request_creation_time + self.request_modification_time = request_modification_time + self.request_notes = request_notes + self.profile_approved_by = profile_approval_by + self.profile_set_id = profile_set_id + self.profile_is_visible = profile_is_visible + self.profile_name = profile_name + self.profile_description = profile_description + self.profile_remote_host = profile_remote_host + self.profile_remote_address = profile_remote_address + + if policy_sets is None: + self.policy_sets = [] + else: + self.policy_sets = policy_sets + + @property + def request_id(self): + return getattr(self, 'requestId') + + @request_id.setter + def request_id(self, value): + setattr(self, 'requestId', value) + + @property + def request_type(self): + return getattr(self, 'requestType') + + @request_type.setter + def request_type(self, value): + setattr(self, 'requestType', value) + + @property + def request_status(self): + return getattr(self, 'requestStatus') + + @request_status.setter + def request_status(self, value): + setattr(self, 'requestStatus', value) + + @property + def request_owner(self): + return getattr(self, 'requestOwner') + + @request_owner.setter + def request_owner(self, value): + setattr(self, 'requestOwner', value) + + @property + def request_creation_time(self): + return getattr(self, 'requestCreationTime') + + @request_creation_time.setter + def request_creation_time(self, value): + setattr(self, 'requestCreationTime', value) + + @property + def request_modification_time(self): + return getattr(self, 'requestModificationTime') + + @request_modification_time.setter + def request_modification_time(self, value): + setattr(self, 'requestModificationTime', value) + + @property + def request_notes(self): + return getattr(self, 'requestNotes') + + @request_notes.setter + def request_notes(self, value): + setattr(self, 'requestNotes', value) + + @property + def profile_approved_by(self): + return getattr(self, 'profileApprovedBy') + + @profile_approved_by.setter + def profile_approved_by(self, value): + setattr(self, 'profileApprovedBy', value) + + @property + def profile_set_id(self): + return getattr(self, 'profileSetId') + + @profile_set_id.setter + def profile_set_id(self, value): + setattr(self, 'profileSetId', value) + + @property + def profile_is_visible(self): + return getattr(self, 'profileIsVisible') + + @profile_is_visible.setter + def profile_is_visible(self, value): + setattr(self, 'profileIsVisible', value) + + @property + def profile_name(self): + return getattr(self, 'profileName') + + @profile_name.setter + def profile_name(self, value): + setattr(self, 'profileName', value) + + @property + def profile_description(self): + return getattr(self, 'profileDescription') + + @profile_description.setter + def profile_description(self, value): + setattr(self, 'profileDescription', value) + + @property + def profile_remote_host(self): + return getattr(self, 'profileRemoteHost') + + @profile_remote_host.setter + def profile_remote_host(self, value): + setattr(self, 'profileRemoteHost', value) + + @property + def profile_remote_address(self): + return getattr(self, 'profileRemoteAddr') + + @profile_remote_address.setter + def profile_remote_address(self, value): + setattr(self, 'profileRemoteAddr', value) + + @property + def policy_sets(self): + return getattr(self, 'ProfilePolicySet') + + @policy_sets.setter + def policy_sets(self, value): + setattr(self, 'ProfilePolicySet', value) + + @classmethod + def from_json(cls, json_value): + + #First read the values for attributes defined in CertEnrollmentRequest + review_response = super(CertReviewResponse, cls).from_json(json_value) + + review_response.nonce = json_value['nonce'] + review_response.request_id = json_value['requestId'] + review_response.request_type = json_value['requestType'] + review_response.request_status = json_value['requestStatus'] + review_response.request_owner = json_value['requestOwner'] + review_response.request_creation_time = json_value['requestCreationTime'] + review_response.request_modification_time = json_value['requestModificationTime'] + review_response.request_notes = json_value['requestNotes'] + review_response.profile_approved_by = json_value['profileApprovedBy'] + review_response.profile_set_id = json_value['profileSetId'] + review_response.profile_is_visible = json_value['profileIsVisible'] + review_response.profile_name = json_value['profileName'] + review_response.profile_description = json_value['profileDescription'] + review_response.profile_remote_host = json_value['profileRemoteHost'] + review_response.profile_remote_address = json_value['profileRemoteAddr'] + + profile_policy_sets = json_value['ProfilePolicySet'] + if not isinstance(profile_policy_sets, types.ListType): + review_response.policy_sets.append(profile.ProfilePolicySet.from_json(profile_policy_sets)) + else: + for policy_set in profile_policy_sets: + review_response.policy_sets.append(profile.ProfilePolicySet.from_json(policy_set)) + + return review_response + + class CertClient(object): """ Class encapsulating and mirroring the functionality in the CertResource Java interface class @@ -298,6 +656,8 @@ class CertClient(object): 'Accept': 'application/json'} self.cert_url = '/rest/certs' self.agent_cert_url = '/rest/agent/certs' + self.cert_requests_url = '/rest/certrequests' + self.agent_cert_requests_url = '/rest/agent/certrequests' self.enrollment_templates = {} @pki.handle_exceptions() @@ -399,10 +759,251 @@ class CertClient(object): r = self.connection.post(url, None, headers=self.headers) return CertRequestInfo.from_json(r.json()) + @pki.handle_exceptions() + def get_request(self, request_id): + """ + Get information of a certificate request with the given request ID. + Returns a CertRequestInfo object. + """ + + if request_id is None: + raise ValueError("Request ID must be specified") + url = self.cert_requests_url + '/' + str(request_id) + r = self.connection.get(url, headers=self.headers) + + return CertRequestInfo.from_json(r.json()) + + @pki.handle_exceptions() + def list_requests(self, request_status=None, request_type=None, from_request_id=None, size=None, + max_results=None, max_time=None): + """ + Query for a list of certificates using the arguments passed. + Returns a CertRequestInfoCollection object. + """ + + query_params = { + 'requestStatus': request_status, + 'requestType': request_type, + 'start': from_request_id, + 'pageSize': size, + 'maxResults': max_results, + 'maxTime': max_time + } + r = self.connection.get(self.agent_cert_requests_url, self.headers, query_params) + return CertRequestInfoCollection.from_json(r.json()) + + @pki.handle_exceptions() + def review_request(self, request_id): + """ + Reviews a certificate enrollment request. + Returns a CertReviewResponse object which contains the nonce + from the server needed to perform an action on the request. + """ + if request_id is None: + raise ValueError("Request Id must be specified.") + + url = self.agent_cert_requests_url + '/' + str(request_id) + r = self.connection.get(url, headers=self.headers) + return CertReviewResponse.from_json(r.json()) + + @pki.handle_exceptions() + def _perform_action(self, request_id, cert_review_response, action): + """ + An internal method used by all the action methods to perform + an action on a certificate request. + The parameter cert_review_response + """ + if request_id is None: + raise ValueError("Request Id must be specified.") + if cert_review_response is None: + cert_review_response = self.review_request(request_id) + + url = self.agent_cert_requests_url + '/' + request_id + '/' + action + review_response = json.dumps(cert_review_response, cls=encoder.CustomTypeEncoder, sort_keys=True) + r = self.connection.post(url, review_response, headers=self.headers) + return r + + def approve_request(self, request_id, cert_review_response=None): + """ + Approves a certificate enrollment request. + If cert_review_response is None, a review request operation is performed to fetch the + CertReviewResponse object. + Requires as agent level authentication. + """ + return self._perform_action(request_id, cert_review_response, 'approve') + + def cancel_request(self, request_id, cert_review_response=None): + """ + Cancels a certificate enrollment request. + If cert_review_response is None, a review request operation is performed to fetch the + CertReviewResponse object. + Requires as agent level authentication. + """ + return self._perform_action(request_id, cert_review_response, 'cancel') + + def reject_request(self, request_id, cert_review_response=None): + """ + Rejects a certificate enrollment request. + If cert_review_response is None, a review request operation is performed to fetch the + CertReviewResponse object. + Requires as agent level authentication. + """ + return self._perform_action(request_id, cert_review_response, 'reject') + + def validate_request(self, request_id, cert_review_response): + """ + Validates a certificate enrollment request. + If cert_review_response is None, a review request operation is performed to fetch the + CertReviewResponse object. + Requires as agent level authentication. + """ + return self._perform_action(request_id, cert_review_response, 'validate') + + def update_request(self, request_id, cert_review_response): + """ + Updates a certificate enrollment request. + If cert_review_response is None, a review request operation is performed to fetch the + CertReviewResponse object. + Requires as agent level authentication. + """ + return self._perform_action(request_id, cert_review_response, 'update') + + def assign_request(self, request_id, cert_review_response): + """ + Assigns a certificate enrollment request. + If cert_review_response is None, a review request operation is performed to fetch the + CertReviewResponse object. + Requires as agent level authentication. + """ + return self._perform_action(request_id, cert_review_response, 'assign') + + def unassign_request(self, request_id, cert_review_response): + """ + Un-assigns a certificate enrollment request. + If cert_review_response is None, a review request operation is performed to fetch the + CertReviewResponse object. + Requires as agent level authentication. + """ + return self._perform_action(request_id, cert_review_response, 'unassign') + + @pki.handle_exceptions() + def list_enrollment_templates(self, start=None, size=None): + """ + Gets the list of profile templates supported by the CA. + The values for start and size arguments determine the starting point and the length of the list. + Returns a ProfileDataInfoCollection object. + """ + + url = self.cert_requests_url + '/profiles' + query_params = { + 'start': start, + 'size': size + } + r = self.connection.get(url, self.headers, query_params) + print r + return profile.ProfileDataInfoCollection.from_json(r.json()) + + @pki.handle_exceptions() + def get_enrollment_template(self, profile_id): + """ + Fetch the enrollment template for the given profile id. + For the first time, the request is sent to the server. + The retrieved CertEnrollmentRequest object is then cached locally for future requests. + Returns a CerEnrollmentRequest object. + """ + + if profile_id in self.enrollment_templates: + return copy.deepcopy(self.enrollment_templates[profile_id]) + url = self.cert_requests_url + '/profiles/' + str(profile_id) + r = self.connection.get(url, self.headers) + + #Caching the enrollment template object in-memory for future use. + enrollment_template = CertEnrollmentRequest.from_json(r.json()) + self.enrollment_templates[profile_id] = enrollment_template + + return copy.deepcopy(enrollment_template) + + @pki.handle_exceptions() + def create_enrollment_request(self, profile_id, inputs): + """ + Fetches the enrollment request object for the given profile and + sets values to its attributes using the values provided in the inputs dictionary. + Returns the CertEnrollmentRequest object, which can be submitted to enroll a certificate. + """ + if inputs is None or len(inputs) == 0: + raise ValueError("No inputs provided.") + + enrollment_template = self.get_enrollment_template(profile_id) + for profile_input in enrollment_template.inputs: + for attribute in profile_input.attributes: + if attribute.name in inputs: + attribute.value = inputs[attribute.name] + + return enrollment_template + + @pki.handle_exceptions() + def submit_enrollment_request(self, enrollment_request): + """ + Submits the CertEnrollmentRequest object to the server. + Returns a CertRequestInfoCollection object with information about the certificate requests + enrolled at the CA. + """ + request_object = json.dumps(enrollment_request, cls=encoder.CustomTypeEncoder, sort_keys=True) + r = self.connection.post(self.cert_requests_url, request_object, self.headers) + return CertRequestInfoCollection.from_json(r.json()) + + @pki.handle_exceptions() + def enroll_cert(self, profile_id, inputs): + """ + A convenience method for enrolling a certificate for a given profile id. + The inputs parameter should be a dictionary with values for the profile attributes + for an enrollment request. + + Calling this method with valid arguments, creates an enrollment request, submits it + to the server, approves the certificate requests generated for the enrollment and + returns a list of CertData objects for all the certificates generated as part of this + enrollment. + + Note: This method supports only certificate enrollment where only one agent approval + is sufficient. + + Requires an agent level authentication. + """ + + # Create a CertEnrollmentRequest object using the inputs for the given profile id. + enroll_request = self.create_enrollment_request(profile_id, inputs) + + # Submit the enrollment request + cert_request_infos = self.submit_enrollment_request(enroll_request) + + # Approve the requests generated for the certificate enrollment. + # Fetch the CertData objects for all the certificates created and return to the caller. + + certificates = [] + for cert_request_info in cert_request_infos.cert_info_list: + request_id = cert_request_info.request_id + self.approve_request(request_id) + cert_id = self.get_request(request_id).cert_id + certificates.append(self.get_cert(cert_id)) + + return certificates + encoder.NOTYPES['CertData'] = CertData encoder.NOTYPES['CertSearchRequest'] = CertSearchRequest encoder.NOTYPES['CertRevokeRequest'] = CertRevokeRequest +encoder.NOTYPES['CertEnrollmentRequest'] = CertEnrollmentRequest +encoder.NOTYPES['ProfileInput'] = profile.ProfileInput +encoder.NOTYPES['ProfileAttribute'] = profile.ProfileAttribute +encoder.NOTYPES['Descriptor'] = profile.Descriptor +encoder.NOTYPES['ProfileOutput'] = profile.ProfileOutput +encoder.NOTYPES['CertReviewResponse'] = CertReviewResponse +encoder.NOTYPES['ProfilePolicySet'] = profile.ProfilePolicySet +encoder.NOTYPES['ProfilePolicy'] = profile.ProfilePolicy +encoder.NOTYPES['PolicyDefault'] = profile.PolicyDefault +encoder.NOTYPES['PolicyConstraint'] = profile.PolicyConstraint +encoder.NOTYPES['PolicyConstraintValue'] = profile.PolicyConstraintValue +encoder.NOTYPES['ProfileParameter'] = profile.ProfileParameter def main(): @@ -416,16 +1017,76 @@ def main(): #Instantiate the CertClient cert_client = CertClient(connection) + cert_client.get_enrollment_template('caUserCert') + + #Enrolling an user certificate + print('Enrolling an user certificate') + print('-----------------------------') + + inputs = dict() + inputs['cert_request_type'] = 'crmf' + inputs['cert_request'] = "MIIBpDCCAaAwggEGAgUA5n9VYTCBx4ABAqUOMAwxCjAIBgNVBAMTAXimgZ8wDQYJKoZIhvcNAQEBBQAD" \ + "gY0AMIGJAoGBAK/SmUVoUjBtqHNw/e3OoCSXw42pdQSR53/eYJWpf7nyTbZ9UuIhGfXOtxy5vRetmDHE" \ + "9u0AopmuJbr1rL17/tSnDakpkE9umQ2lMOReLloSdX32w2xOeulUwh5BGbFpq10S0SvW1H93Vn0eCy2a" \ + "a4UtILNEsp7JJ3FnYJibfuMPAgMBAAGpEDAOBgNVHQ8BAf8EBAMCBeAwMzAVBgkrBgEFBQcFAQEMCHJl" \ + "Z1Rva2VuMBoGCSsGAQUFBwUBAgwNYXV0aGVudGljYXRvcqGBkzANBgkqhkiG9w0BAQUFAAOBgQCuywnr" \ + "Dk/wGwfbguw9oVs9gzFQwM4zeFbk+z82G5CWoG/4mVOT5LPL5Q8iF+KfnaU9Qcu6zZPxW6ZmDd8WpPJ+" \ + "MTPyQl3Q5BfiKa4l5ra1NeqxMOlMiiupwINmm7jd1KaA2eIjuyC8/gTaO4b14R6aRaOj+Scp9cNYbthA7REhJw==" + inputs['sn_uid'] = 'test12345' + inputs['sn_e'] = 'example@redhat.com' + inputs['sn_cn'] = 'TestUser' + + cert_data_infos = cert_client.enroll_cert('caUserCert', inputs) + + for data in cert_data_infos: + print('Serial Number: ' + data.serial_number) + print('Issuer: ' + data.issuer_dn) + print('Subject: ' + data.subject_dn) + print('Pretty Print:') + print(data.pretty_repr) + + print + + # Enrolling a server certificate + print("Enrolling a server certificate") + print('------------------------------') + + inputs = dict() + inputs['cert_request_type'] = 'pkcs10' + inputs['cert_request'] = "MIIBmDCCAQECAQAwWDELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAk5DMRAwDgYDVQQHDAdSYWxlaWdoMRUwE" \ + "wYDVQQKDAxSZWQgSGF0IEluYy4xEzARBgNVBAMMClRlc3RTZXJ2ZXIwgZ8wDQYJKoZIhvcNAQEBBQADgY" \ + "0AMIGJAoGBAMJpWz92dSYCvWxllrQCY5atPKCswUwyppRNGPnKmJ77AdHBBI4dFyET+h/+69jQMTLZMa8" \ + "FX7SbyHvgbgLBP4Q/RzCSE2S87qFNjriOqiQCqJmcrzDzdncJQiP+O7T6MSpLo3smLP7dK1Vd7vK0Vy8y" \ + "HwV0eBx7DgYedv2slBPHAgMBAAGgADANBgkqhkiG9w0BAQUFAAOBgQBvkxAGKwkfK3TKwLc5Mg0IWp8zG" \ + "RVwxdIlghAL8DugNocCNNgmZazglJOOehLuk0/NkLX1ZM5RrVgM09W6kcfWZtIwr5Uje2K/+6tW2ZTGrb" \ + "izs7CNOTMzA/9H8CkHb4H9P/qRT275zHIocYj4smUnXLwWGsBMeGs+OMMbGvSrHg==" + + inputs['requestor_name'] = 'Tester' + inputs['requestor_email'] = 'example@redhat.com' + + cert_data_infos_2 = cert_client.enroll_cert('caServerCert', inputs) + for data in cert_data_infos_2: + print('Serial Number: ' + data.serial_number) + print('Issuer: ' + data.issuer_dn) + print('Subject: ' + data.subject_dn) + print('Pretty Print:') + print(data.pretty_repr) + + print + # List all the VALID certs + print('An example listing all VALID certs') + print('----------------------------------') + search_params = {'status': 'VALID'} - cert_data_infos = cert_client.list_certs(**search_params) - for cert_data_info in cert_data_infos.cert_info_list: + cert_data_list = cert_client.list_certs(**search_params) + for cert_data_info in cert_data_list.cert_info_list: print("Serial Number: " + cert_data_info.cert_id) print("Subject DN: " + cert_data_info.subject_dn) print("Status: " + cert_data_info.status) print - #Trying an invalid get cert + #Trying to get a non-existing cert #Assuming that there is no certificate with serial number = 100 try: cert_data = cert_client.get_cert(100) @@ -437,10 +1098,14 @@ def main(): print # Certificate Serial Number used for CertClient methods. - # 7 and '0x7' are also valid values - cert_id = 0x7 + # 7, 0x7 and '0x7' are also valid values + # Following examples use the serial number of the user certificate enrolled before. + cert_id = cert_data_infos[0].serial_number #Get certificate data + print('Getting information of a certificate') + print('------------------------------------') + cert_data = cert_client.get_cert(cert_id) # Print the certificate information print('Serial Number: ' + cert_data.serial_number) @@ -456,6 +1121,9 @@ def main(): print # Review a certificate - used to get a nonce for revoke request. + print('Reviewing a certificate') + print('-----------------------') + cert_data = cert_client.review_cert(cert_id) print('Serial Number: ' + cert_data.serial_number) print('Issuer: ' + cert_data.issuer_dn) @@ -465,6 +1133,9 @@ def main(): print #Revoke a certificate + print('Revoking a certificate') + print('----------------------') + cert_request_info = cert_client.revoke_cert(cert_data.serial_number, revocation_reason=CertRevokeRequest.REASON_CERTIFICATE_HOLD, comments="Test revoking a cert", nonce=cert_data.nonce) @@ -474,6 +1145,9 @@ def main(): print #Un-revoke a certificate + print('Un-revoking a certificate') + print('-------------------------') + cert_request_info = cert_client.unrevoke_cert(cert_data.serial_number) print('Request ID: ' + cert_request_info.request_id) print('Request Type: ' + cert_request_info.request_type) @@ -482,4 +1156,4 @@ def main(): if __name__ == "__main__": - main() + main()
\ No newline at end of file diff --git a/base/common/python/pki/profile.py b/base/common/python/pki/profile.py new file mode 100644 index 000000000..490db0994 --- /dev/null +++ b/base/common/python/pki/profile.py @@ -0,0 +1,577 @@ +#!/usr/bin/python +""" +Created on May 13,, 2014 + +@author: akoneru +""" + +import types + +import pki + + +class ProfileDataInfo(object): + """Stores information about a profile""" + def __init__(self): + self.profile_id = None + self.profile_name = None + self.profile_description = None + self.profile_url = None + + @classmethod + def from_json(cls, attr_list): + profile_data_info = cls() + profile_data_info.profile_id = attr_list['profileId'] + profile_data_info.profile_name = attr_list['profileName'] + profile_data_info.profile_description = attr_list['profileDescription'] + profile_data_info.profile_url = attr_list['profileURL'] + + return profile_data_info + + +class ProfileDataInfoCollection(object): + """ + Represents a collection of ProfileDataInfo objects. + Also encapsulates the links for the list of the objects stored. + """ + + def __init__(self): + self.profile_data_list = [] + self.links = [] + + @classmethod + def from_json(cls, json_value): + ret = cls() + profile_data_infos = json_value['entries'] + if not isinstance(profile_data_infos, types.ListType): + ret.profile_data_list.append(ProfileDataInfo.from_json(profile_data_infos)) + else: + for profile_info in profile_data_infos: + ret.profile_data_list.append(ProfileDataInfo.from_json(profile_info)) + + links = json_value['Link'] + if not isinstance(links, types.ListType): + ret.links.append(pki.Link.from_json(links)) + else: + for link in links: + ret.links.append(pki.Link.from_json(link)) + + return ret + + +class Descriptor(object): + """ + This class represents the description of a ProfileAttribute. + It stores information such as the syntax, constraint and default value of a profile attribute. + """ + + def __init__(self, syntax=None, constraint=None, description=None, default_value=None): + self.syntax = syntax + self.constraint = constraint + self.description = description + self.default_value = default_value + + @property + def syntax(self): + return getattr(self, 'Syntax', None) + + @syntax.setter + def syntax(self, value): + setattr(self, 'Syntax', value) + + @property + def constraint(self): + return getattr(self, 'Constraint', None) + + @constraint.setter + def constraint(self, value): + setattr(self, 'Constraint', value) + + @property + def description(self): + return getattr(self, 'Description', None) + + @description.setter + def description(self, value): + setattr(self, 'Description', value) + + @property + def default_value(self): + return getattr(self, 'DefaultValue', None) + + @default_value.setter + def default_value(self, value): + setattr(self, 'DefaultValue', value) + + @classmethod + def from_json(cls, attr_list): + descriptor = cls() + for attr in attr_list: + setattr(descriptor, attr, attr_list[attr]) + + return descriptor + + +class ProfileAttribute(object): + """ + Represents a profile attribute of a ProfileInput. + """ + def __init__(self, name=None, value=None, descriptor=None): + self.name = name + self.value = value + self.descriptor = descriptor + + @property + def descriptor(self): + return getattr(self, 'Descriptor') + + @descriptor.setter + def descriptor(self, value): + setattr(self, 'Descriptor', value) + + @property + def value(self): + return getattr(self, 'Value') + + @value.setter + def value(self, value): + setattr(self, 'Value', value) + + @classmethod + def from_json(cls, attr_list): + attribute = cls() + attribute.name = attr_list['name'] + if 'Value' in attr_list: + attribute.value = attr_list['Value'] + if 'Descriptor' in attr_list: + attribute.descriptor = Descriptor.from_json(attr_list['Descriptor']) + + return attribute + + +class ProfileInput(object): + """ + This class encapsulates all the attributes of a profile to generate a + specific property of a certificate. + Ex. Subject name, Requestor Information etc. + """ + + def __init__(self, profile_input_id=None, class_id=None, name=None, text=None, attributes=None, + config_attributes=None): + + self.profile_input_id = profile_input_id + self.class_id = class_id + self.name = name + self.text = text + if attributes is None: + self.attributes = [] + if config_attributes is None: + self.config_attributes = [] + + @property + def profile_input_id(self): + return getattr(self, 'id') + + @profile_input_id.setter + def profile_input_id(self, value): + setattr(self, 'id', value) + + @property + def class_id(self): + return getattr(self, 'ClassID', None) + + @class_id.setter + def class_id(self, value): + setattr(self, 'ClassID', value) + + @property + def name(self): + return getattr(self, 'Name', None) + + @name.setter + def name(self, value): + setattr(self, 'Name', value) + + @property + def text(self): + return getattr(self, 'Text', None) + + @text.setter + def text(self, value): + setattr(self, 'Text', value) + + @property + def attributes(self): + return getattr(self, 'Attribute') + + @attributes.setter + def attributes(self, value): + setattr(self, 'Attribute', value) + + @property + def config_attributes(self): + return getattr(self, 'ConfigAttribute') + + @config_attributes.setter + def config_attributes(self, value): + setattr(self, 'ConfigAttribute', value) + + def add_attribute(self, profile_attribute): + self.attributes.append(profile_attribute) + + def remove_attribute(self, profile_attribute_name): + for attr in self.attributes: + if attr.name == profile_attribute_name: + self.attributes.remove(attr) + break + + def get_attribute(self, profile_attribute_name): + for attr in self.attributes: + if attr.name == profile_attribute_name: + return attr + + return None + + def add_config_attribute(self, profile_attribute): + self.attributes.append(profile_attribute) + + def remove_config_attribute(self, config_attribute_name): + for attr in self.config_attributes: + if attr.name == config_attribute_name: + self.attributes.remove(attr) + break + + def get_config_attribute(self, config_attribute_name): + for attr in self.attributes: + if attr.name == config_attribute_name: + return attr + + return None + + @classmethod + def from_json(cls, json_value): + profile_input = cls() + profile_input.profile_input_id = json_value['id'] + profile_input.class_id = json_value['ClassID'] + profile_input.name = json_value['Name'] + if 'Text' in json_value: + profile_input.text = json_value['Text'] + + attributes = json_value['Attribute'] + if not isinstance(attributes, types.ListType): + profile_input.attributes.append(ProfileAttribute.from_json(attributes)) + else: + for profile_info in attributes: + profile_input.attributes.append(ProfileAttribute.from_json(profile_info)) + + config_attributes = json_value['ConfigAttribute'] + if not isinstance(config_attributes, types.ListType): + profile_input.config_attributes.append(ProfileAttribute.from_json(config_attributes)) + else: + for config_attribute in config_attributes: + profile_input.config_attributes.append(ProfileAttribute.from_json(config_attribute)) + + return profile_input + + +class ProfileOutput(object): + """ + This class defines the output of a certificate enrollment request + using a profile. + """ + + def __init__(self, profile_output_id=None, name=None, text=None, class_id=None, attributes=None): + self.profile_output_id = profile_output_id + self.name = name + self.text = text + self.class_id = class_id + if attributes is None: + self.attributes = [] + + @property + def profile_output_id(self): + return getattr(self, 'id') + + @profile_output_id.setter + def profile_output_id(self, value): + setattr(self, 'id', value) + + @property + def class_id(self): + return getattr(self, 'classId', None) + + @class_id.setter + def class_id(self, value): + setattr(self, 'classId', value) + + def add_attribute(self, profile_attribute): + self.attributes.append(profile_attribute) + + def remove_attribute(self, profile_attribute_name): + for attr in self.attributes: + if attr.name == profile_attribute_name: + self.attributes.remove(attr) + break + + def get_attribute(self, profile_attribute_name): + for attr in self.attributes: + if attr.name == profile_attribute_name: + return attr + + return None + + @classmethod + def from_json(cls, json_value): + profile_output = cls() + profile_output.profile_output_id = json_value['id'] + profile_output.name = json_value['name'] + profile_output.text = json_value['text'] + profile_output.class_id = json_value['classId'] + attributes = json_value['attributes'] + if not isinstance(attributes, types.ListType): + profile_output.attributes.append(ProfileAttribute.from_json(attributes)) + else: + for profile_info in attributes: + profile_output.attributes.append(ProfileAttribute.from_json(profile_info)) + return profile_output + + +class ProfileParameter(object): + + def __init__(self, name=None, value=None): + self.name = name + self.value = value + + @classmethod + def from_json(cls, attr_list): + param = cls() + for attr in attr_list: + setattr(param, attr, attr_list[attr]) + return param + + +class PolicyDefault(object): + """ + An object of this class contains information of the default usage of a specific ProfileInput. + """ + + def __init__(self, name=None, class_id=None, description=None, policy_attributes=None, policy_params=None): + self.name = name + self.class_id = class_id + self.description = description + if policy_attributes is None: + self.policy_attributes = [] + else: + self.policy_attributes = policy_attributes + if policy_params is None: + self.policy_params = [] + else: + self.policy_params = policy_params + + @property + def name(self): + return getattr(self, 'id') + + @name.setter + def name(self, value): + setattr(self, 'id', value) + + @property + def class_id(self): + return getattr(self, 'classId') + + @class_id.setter + def class_id(self, value): + setattr(self, 'classId', value) + + @property + def policy_attributes(self): + return getattr(self, 'policyAttribute') + + @policy_attributes.setter + def policy_attributes(self, value): + setattr(self, 'policyAttribute', value) + + @property + def policy_params(self): + return getattr(self, 'params') + + @policy_params.setter + def policy_params(self, value): + setattr(self, 'params', value) + + @classmethod + def from_json(cls, json_value): + policy_def = cls() + if 'id' in json_value: + policy_def.name = json_value['id'] + if 'classId' in json_value: + policy_def.class_id = json_value['classId'] + if 'description' in json_value: + policy_def.description = json_value['description'] + if 'policyAttribute' in json_value: + attributes = json_value['policyAttribute'] + if not isinstance(attributes, types.ListType): + policy_def.policy_attributes.append(ProfileAttribute.from_json(attributes)) + else: + for attr in attributes: + policy_def.policy_attributes.append(ProfileAttribute.from_json(attr)) + + if 'params' in json_value: + params = json_value['params'] + if not isinstance(params, types.ListType): + policy_def.policy_params.append(ProfileParameter.from_json(params)) + else: + for param in params: + policy_def.policy_params.append(ProfileParameter.from_json(param)) + + return policy_def + + +class PolicyConstraintValue(object): + + def __init__(self, name=None, value=None, descriptor=None): + self.name = name + self.value = value + self.descriptor = descriptor + + @property + def name(self): + return getattr(self, 'id') + + @name.setter + def name(self, value): + setattr(self, 'id', value) + + @classmethod + def from_json(cls, json_value): + ret = cls() + + ret.name = json_value['id'] + ret.value = json_value['value'] + if 'descriptor' in json_value: + ret.descriptor = Descriptor.from_json(json_value['descriptor']) + + return ret + + +class PolicyConstraint(object): + """ + An object of this class contains the policy constraints applied to a ProfileInput + used by a certificate enrollment request. + """ + + def __init__(self, name=None, description=None, class_id=None, policy_constraint_values=None): + self.name = name + self.description = description + self.class_id = class_id + if policy_constraint_values is None: + self.policy_constraint_values = [] + else: + self.policy_constraint_values = policy_constraint_values + + @property + def name(self): + return getattr(self, 'id') + + @name.setter + def name(self, value): + setattr(self, 'id', value) + + @property + def class_id(self): + return getattr(self, 'classId') + + @class_id.setter + def class_id(self, value): + setattr(self, 'classId', value) + + @property + def policy_constraint_values(self): + return getattr(self, 'constraint') + + @policy_constraint_values.setter + def policy_constraint_values(self, value): + setattr(self, 'constraint', value) + + @classmethod + def from_json(cls, json_value): + policy_constraint = cls() + if 'id' in json_value: + policy_constraint.name = json_value['id'] + if 'description' in json_value: + policy_constraint.description = json_value['description'] + if 'classId' in json_value: + policy_constraint.class_id = json_value['classId'] + if 'constraint' in json_value: + constraints = json_value['constraint'] + if not isinstance(constraints, types.ListType): + policy_constraint.policy_constraint_values.append(PolicyConstraintValue.from_json(constraints)) + else: + for constraint in constraints: + policy_constraint.policy_constraint_values.append(PolicyConstraintValue.from_json(constraint)) + + return policy_constraint + + +class ProfilePolicy(object): + """ + This class represents the policy a profile adheres to. + An object of this class stores the default values for profile and the constraints present on the + values of the attributes of the profile submitted for an enrollment request. + """ + + def __init__(self, policy_id=None, policy_default=None, policy_constraint=None): + self.policy_id = policy_id + self.policy_default = policy_default + self.policy_constraint = policy_constraint + + @property + def policy_id(self): + return getattr(self, 'id') + + @policy_id.setter + def policy_id(self, value): + setattr(self, 'id', value) + + @property + def policy_default(self): + return getattr(self, 'def') + + @policy_default.setter + def policy_default(self, value): + setattr(self, 'def', value) + + @property + def policy_constraint(self): + return getattr(self, 'constraint') + + @policy_constraint.setter + def policy_constraint(self, value): + setattr(self, 'constraint', value) + + @classmethod + def from_json(cls, json_value): + return cls(json_value['id'], PolicyDefault.from_json(json_value['def']), + PolicyConstraint.from_json(json_value['constraint'])) + + +class ProfilePolicySet(object): + """ + Stores a list of ProfilePolicy objects. + """ + def __init__(self): + self.policies = [] + + @classmethod + def from_json(cls, attr_list): + policy_set = cls() + + policies = attr_list['policies'] + if not isinstance(policies, types.ListType): + policy_set.policies.append(ProfilePolicy.from_json(policies)) + else: + for policy in policies: + policy_set.policies.append(ProfilePolicy.from_json(policy)) + + return policy_set |