summaryrefslogtreecommitdiffstats
path: root/base/common/python/pki/cert.py
diff options
context:
space:
mode:
authorAbhishek Koneru <akoneru@redhat.com>2014-05-23 12:17:38 -0400
committerAbhishek Koneru <akoneru@redhat.com>2014-06-03 02:39:12 -0400
commit5b7c76832dc72c85d9dd9db16f861f3283fa7eb0 (patch)
tree9d721618324da863e9ac635ae5779225c8d33aeb /base/common/python/pki/cert.py
parent1d772fad871e4d0e0500f266e99d17973b4c3dca (diff)
downloadpki-5b7c76832dc72c85d9dd9db16f861f3283fa7eb0.tar.gz
pki-5b7c76832dc72c85d9dd9db16f861f3283fa7eb0.tar.xz
pki-5b7c76832dc72c85d9dd9db16f861f3283fa7eb0.zip
Addressed comments given for patches 92-2, 93, 94.
Addressed review comments for the patches that implement the CertClient and a part of ProfileClient. Also includes the pycharm project files in pki/.idea.
Diffstat (limited to 'base/common/python/pki/cert.py')
-rw-r--r--base/common/python/pki/cert.py493
1 files changed, 305 insertions, 188 deletions
diff --git a/base/common/python/pki/cert.py b/base/common/python/pki/cert.py
index b22307ad1..036bbf4e3 100644
--- a/base/common/python/pki/cert.py
+++ b/base/common/python/pki/cert.py
@@ -14,16 +14,6 @@ import pki.encoder as encoder
import pki.profile as profile
-class CertId(object):
- """
- Class encapsulating a certificate serial number
- """
-
- def __init__(self, cert_id):
- """ Constructor """
- self.value = cert_id
-
-
class CertData(object):
"""
Class containing certificate data as returned from getCert()
@@ -43,6 +33,16 @@ class CertData(object):
self.nonce = None
self.link = None
+ def __repr__(self):
+ attributes = {
+ "CertData": {
+ "serial_number": self.serial_number,
+ "subject_dn": self.subject_dn,
+ "status": self.status
+ }
+ }
+ return str(attributes)
+
@classmethod
def from_json(cls, attr_list):
""" Return CertData object from JSON dict """
@@ -72,7 +72,7 @@ class CertDataInfo(object):
def __init__(self):
""" Constructor """
- self.cert_id = None
+ self.serial_number = None
self.subject_dn = None
self.status = None
self.type = None
@@ -85,11 +85,21 @@ class CertDataInfo(object):
self.issued_by = None
self.link = None
+ def __repr__(self):
+ obj = {
+ "CertDataInfo": {
+ 'serial_number': self.serial_number,
+ 'subject_dn': self.subject_dn,
+ 'type': self.type,
+ 'status': self.status
+ }}
+ return str(obj)
+
@classmethod
def from_json(cls, attr_list):
""" Return CertDataInfo object from JSON dict """
cert_data_info = cls()
- cert_data_info.cert_id = attr_list['id']
+ cert_data_info.serial_number = attr_list['id']
cert_data_info.subject_dn = attr_list['SubjectDN']
cert_data_info.status = attr_list['Status']
cert_data_info.type = attr_list['Type']
@@ -107,25 +117,30 @@ class CertDataInfo(object):
class CertDataInfoCollection(object):
"""
- Class containing list of CertDataInfo objects and their respective link objects.
+ Class containing list of CertDataInfo objects and their respective link
+ objects.
This data is returned when searching/listing certificate records in the CA.
"""
def __init__(self):
""" Constructor """
- self.cert_info_list = []
+ self.cert_data_info_list = []
self.links = []
+ def __iter__(self):
+ return iter(self.cert_data_info_list)
+
@classmethod
def from_json(cls, json_value):
""" Populate object from JSON input """
ret = cls()
cert_infos = json_value['entries']
if not isinstance(cert_infos, types.ListType):
- ret.cert_info_list.append(CertDataInfo.from_json(cert_infos))
+ ret.cert_data_info_list.append(CertDataInfo.from_json(cert_infos))
else:
for cert_info in cert_infos:
- ret.cert_info_list.append(CertDataInfo.from_json(cert_info))
+ ret.cert_data_info_list.append(
+ CertDataInfo.from_json(cert_info))
links = json_value['Link']
if not isinstance(links, types.ListType):
@@ -155,6 +170,17 @@ class CertRequestInfo(object):
self.cert_url = None
self.error_message = None
+ def __repr__(self):
+ obj = {
+ 'CertRequestInfo': {
+ 'request_id': self.request_id,
+ 'request_type': self.request_type,
+ 'request_status': self.request_status,
+ 'request_url': self.request_url
+ }
+ }
+ return str(obj)
+
@classmethod
def from_json(cls, attr_list):
cert_request_info = cls()
@@ -163,7 +189,8 @@ class CertRequestInfo(object):
cert_request_info.request_status = attr_list['requestStatus']
cert_request_info.operation_result = attr_list['operationResult']
cert_request_info.request_id = \
- str(cert_request_info.request_url)[(str(cert_request_info.request_url).rfind("/") + 1):]
+ str(cert_request_info.request_url)[(str(
+ cert_request_info.request_url).rfind("/") + 1):]
#Optional parameters
if 'certId' in attr_list:
cert_request_info.cert_id = attr_list['certId']
@@ -184,19 +211,24 @@ class CertRequestInfoCollection(object):
"""
def __init__(self):
- self.cert_info_list = []
+ self.cert_request_info_list = []
self.links = []
+ def __iter__(self):
+ return iter(self.cert_request_info_list)
+
@classmethod
def from_json(cls, json_value):
""" Populate object from JSON input """
ret = cls()
cert_req_infos = json_value['entries']
if not isinstance(cert_req_infos, types.ListType):
- ret.cert_info_list.append(CertRequestInfo.from_json(cert_req_infos))
+ ret.cert_request_info_list.append(
+ CertRequestInfo.from_json(cert_req_infos))
else:
for cert_info in cert_req_infos:
- ret.cert_info_list.append(CertRequestInfo.from_json(cert_info))
+ ret.cert_request_info_list.append(
+ CertRequestInfo.from_json(cert_info))
links = json_value['Link']
if not isinstance(links, types.ListType):
@@ -215,18 +247,28 @@ class CertSearchRequest(object):
"""
search_params = {'serial_to': 'serialTo', 'serial_from': 'serialFrom',
- 'email': 'eMail', 'common_name': 'commonName', 'user_id': 'userID',
- 'org_unit': 'orgUnit', 'org': 'org', 'locality': 'locality',
- 'state': 'state', 'country': 'country', 'match_exactly': 'matchExactly',
- 'status': 'status', 'revoked_by': 'revokedBy', 'revoked_on_from': 'revokedOnFrom',
- 'revoked_on_to': 'revokedOnTo', 'revocation_reason': 'revocationReason',
- 'issued_by': 'issuedBy', 'issued_on_from': 'issuedOnFrom', 'issued_on_to': 'issuedOnTo',
- 'valid_not_before_from': 'validNotBeforeFrom', 'valid_not_before_to': 'validNotBeforeTo',
- 'valid_not_after_from': 'validNotAfterFrom', 'valid_not_after_to': 'validNotAfterTo',
- 'validity_operation': 'validityOperation', 'validity_count': 'validityCount',
- 'validity_unit': 'validityUnit', 'cert_type_sub_email_ca': 'certTypeSubEmailCA',
- 'cert_type_sub_ssl_ca': 'certTypeSubSSLCA', 'cert_type_secure_email': 'certTypeSecureEmail',
- 'cert_type_ssl_client': 'certTypeSSLClient', 'cert_type_ssl_server': 'certTypeSSLServer'}
+ 'email': 'eMail', 'common_name': 'commonName',
+ 'user_id': 'userID', 'org_unit': 'orgUnit', 'org': 'org',
+ 'locality': 'locality', 'state': 'state',
+ 'country': 'country', 'match_exactly': 'matchExactly',
+ 'status': 'status', 'revoked_by': 'revokedBy',
+ 'revoked_on_from': 'revokedOnFrom',
+ 'revoked_on_to': 'revokedOnTo',
+ 'revocation_reason': 'revocationReason',
+ 'issued_by': 'issuedBy', 'issued_on_from': 'issuedOnFrom',
+ 'issued_on_to': 'issuedOnTo',
+ 'valid_not_before_from': 'validNotBeforeFrom',
+ 'valid_not_before_to': 'validNotBeforeTo',
+ 'valid_not_after_from': 'validNotAfterFrom',
+ 'valid_not_after_to': 'validNotAfterTo',
+ 'validity_operation': 'validityOperation',
+ 'validity_count': 'validityCount',
+ 'validity_unit': 'validityUnit',
+ 'cert_type_sub_email_ca': 'certTypeSubEmailCA',
+ 'cert_type_sub_ssl_ca': 'certTypeSubSSLCA',
+ 'cert_type_secure_email': 'certTypeSecureEmail',
+ 'cert_type_ssl_client': 'certTypeSSLClient',
+ 'cert_type_ssl_server': 'certTypeSSLServer'}
def __init__(self, **cert_search_params):
""" Constructor """
@@ -234,59 +276,64 @@ class CertSearchRequest(object):
if len(cert_search_params) == 0:
setattr(self, 'serialNumberRangeInUse', True)
- for param in cert_search_params:
+ for param, value in cert_search_params.viewitems():
if not param in CertSearchRequest.search_params:
raise ValueError('Invalid search parameter: ' + param)
- if param == 'serial_to' or param == 'serial_from':
- setattr(self, CertSearchRequest.search_params[param], cert_search_params[param])
+ if param in {'serial_to', 'serial_from'}:
+ setattr(self, CertSearchRequest.search_params[param], value)
setattr(self, 'serialNumberRangeInUse', True)
- if param == 'email' or param == 'common_name' or param == 'user_id' or param == 'org_unit' \
- or param == 'org' or param == 'locality' or param == 'state' or param == 'country' \
- or param == 'match_exactly':
- setattr(self, CertSearchRequest.search_params[param], cert_search_params[param])
+ if param in {
+ 'email', 'common_name', 'user_id', 'org_unit', 'org',
+ 'locality', 'state', 'country', 'match_exactly'
+ }:
+ setattr(self, CertSearchRequest.search_params[param], value)
setattr(self, 'subjectInUse', True)
if param == 'status':
- setattr(self, CertSearchRequest.search_params[param], cert_search_params[param])
+ setattr(self, CertSearchRequest.search_params[param], value)
if param == 'revoked_by':
- setattr(self, CertSearchRequest.search_params[param], cert_search_params[param])
+ setattr(self, CertSearchRequest.search_params[param], value)
setattr(self, 'revokedByInUse', True)
- if param == 'revoked_on_from' or param == 'revoked_on_to':
- setattr(self, CertSearchRequest.search_params[param], cert_search_params[param])
+ if param in {'revoked_on_from', 'revoked_on_to'}:
+ setattr(self, CertSearchRequest.search_params[param], value)
setattr(self, 'revokedOnInUse', True)
if param == 'revocation_reason':
- setattr(self, CertSearchRequest.search_params[param], cert_search_params[param])
+ setattr(self, CertSearchRequest.search_params[param], value)
setattr(self, 'revocationReasonInUse', True)
if param == 'issued_by':
- setattr(self, CertSearchRequest.search_params[param], cert_search_params[param])
+ setattr(self, CertSearchRequest.search_params[param], value)
setattr(self, 'issuedByInUse', True)
- if param == 'issued_on_from' or param == 'issued_on_to':
- setattr(self, CertSearchRequest.search_params[param], cert_search_params[param])
+ if param in {'issued_on_from', 'issued_on_to'}:
+ setattr(self, CertSearchRequest.search_params[param], value)
setattr(self, 'issuedOnInUse', True)
- if param == 'valid_not_before_from' or param == 'valid_not_before_to':
- setattr(self, CertSearchRequest.search_params[param], cert_search_params[param])
+ if param in {'valid_not_before_from', 'valid_not_before_to'}:
+ setattr(self, CertSearchRequest.search_params[param], value)
setattr(self, 'validNotBeforeInUse', True)
- if param == 'valid_not_after_from' or param == 'valid_not_after_to':
- setattr(self, CertSearchRequest.search_params[param], cert_search_params[param])
+ if param in {'valid_not_after_from', 'valid_not_after_to'}:
+ setattr(self, CertSearchRequest.search_params[param], value)
setattr(self, 'validNotAfterInUse', True)
- if param == 'validity_operation' or param == 'validity_count' or param == 'validity_unit':
- setattr(self, CertSearchRequest.search_params[param], cert_search_params[param])
+ if param in {
+ 'validity_operation', 'validity_count', 'validity_unit'
+ }:
+ setattr(self, CertSearchRequest.search_params[param], value)
setattr(self, 'validityLengthInUse', True)
- if param == 'cert_type_sub_email_ca' or param == 'cert_type_sub_ssl_ca' \
- or param == 'cert_type_secure_email' or param == 'cert_type_ssl_client' \
- or param == 'cert_type_ssl_server':
- setattr(self, CertSearchRequest.search_params[param], cert_search_params[param])
+ if param in {
+ 'cert_type_sub_email_ca', 'cert_type_sub_ssl_ca',
+ 'cert_type_secure_email', 'cert_type_ssl_client',
+ 'cert_type_ssl_server'
+ }:
+ setattr(self, CertSearchRequest.search_params[param], value)
setattr(self, 'certTypeInUse', True)
@@ -294,24 +341,28 @@ class CertRevokeRequest(object):
"""
An object of this class encapsulates all the
parameters required for revoking a certificate.
- """
- REASON_UNSPECIFIED = "Unspecified"
- REASON_KEY_COMPROMISE = "Key_Compromise"
- REASON_CA_COMPROMISE = "CA_Compromise"
- REASON_AFFILIATION_CHANGED = "Affiliation_Changed"
- REASON_SUPERSEDED = "Superseded"
- REASON_CESSATION_OF_OPERATION = "Cessation_of_Operation"
- REASON_CERTIFICATE_HOLD = "Certificate_Hold"
- REASON_REMOVE_FROM_CRL = "Remove_from_CRL"
- REASON_PRIVILEGE_WITHDRAWN = "Privilege_Withdrawn"
- REASON_AA_COMPROMISE = "AA_Compromise"
+ Valid values for reasons for revoking a request are:
+ 'Unspecified', 'Key_Compromise', 'CA_Compromise',
+ 'Affiliation_Changed', 'Superseded', 'Cessation_of_Operation',
+ 'Certificate_Hold', 'Remove_from_CRL', 'Privilege_Withdrawn',
+ 'AA_Compromise'
+ """
+ reasons = ['Unspecified', 'Key_Compromise', 'CA_Compromise',
+ 'Affiliation_Changed', 'Superseded', 'Cessation_of_Operation',
+ 'Certificate_Hold', 'Remove_from_CRL', 'Privilege_Withdrawn',
+ 'AA_Compromise']
def __init__(self, nonce, reason=None, invalidity_date=None, comments=None):
""" Constructor """
+
setattr(self, "Nonce", nonce)
+
if reason is None:
- reason = self.REASON_UNSPECIFIED
+ reason = 'Unspecified'
+ else:
+ if reason not in CertRevokeRequest.reasons:
+ raise ValueError('Invalid revocation reason specified.')
setattr(self, "Reason", reason)
if invalidity_date is not None:
setattr(self, "InvalidityDate", invalidity_date)
@@ -321,11 +372,13 @@ class CertRevokeRequest(object):
class CertEnrollmentRequest(object):
"""
- This class encapsulates the parameters required for a certificate enrollment request.
+ This class encapsulates the parameters required for a certificate
+ enrollment request.
"""
- def __init__(self, profile_id=None, renewal=False, serial_number=None, remote_host=None, remote_address=None,
- inputs=None, outputs=None):
+ def __init__(self, profile_id=None, renewal=False, serial_number=None,
+ remote_host=None, remote_address=None, inputs=None,
+ outputs=None):
""" Constructor """
self.profile_id = profile_id
self.renewal = renewal
@@ -443,14 +496,17 @@ class CertEnrollmentRequest(object):
enroll_request.inputs.append(profile.ProfileInput.from_json(inputs))
else:
for profile_input in inputs:
- enroll_request.inputs.append(profile.ProfileInput.from_json(profile_input))
+ enroll_request.inputs.append(
+ profile.ProfileInput.from_json(profile_input))
outputs = json_value['Output']
if not isinstance(outputs, types.ListType):
- enroll_request.outputs.append(profile.ProfileOutput.from_json(outputs))
+ enroll_request.outputs.append(
+ profile.ProfileOutput.from_json(outputs))
else:
for profile_output in outputs:
- enroll_request.outputs.append(profile.ProfileOutput.from_json(profile_output))
+ enroll_request.outputs.append(
+ profile.ProfileOutput.from_json(profile_output))
return enroll_request
@@ -462,14 +518,21 @@ class CertReviewResponse(CertEnrollmentRequest):
It contains a nonce required to perform action on the request.
"""
- def __init__(self, profile_id=None, renewal=False, serial_number=None, remote_host=None, remote_address=None,
- inputs=None, outputs=None, nonce=None, request_id=None, request_type=None, request_status=None,
- request_owner=None, request_creation_time=None, request_modification_time=None, request_notes=None,
- profile_approval_by=None, profile_set_id=None, profile_is_visible=None, profile_name=None,
- profile_description=None, profile_remote_host=None, profile_remote_address=None, policy_sets=None):
-
- super(CertReviewResponse, self).__init__(profile_id, renewal, serial_number, remote_host,
- remote_address, inputs, outputs)
+ def __init__(self, profile_id=None, renewal=False, serial_number=None,
+ remote_host=None, remote_address=None, inputs=None,
+ outputs=None, nonce=None, request_id=None, request_type=None,
+ request_status=None, request_owner=None,
+ request_creation_time=None, request_modification_time=None,
+ request_notes=None, profile_approval_by=None,
+ profile_set_id=None, profile_is_visible=None,
+ profile_name=None, profile_description=None,
+ profile_remote_host=None, profile_remote_address=None,
+ policy_sets=None):
+
+ super(CertReviewResponse, self).__init__(profile_id, renewal,
+ serial_number, remote_host,
+ remote_address, inputs,
+ outputs)
self.nonce = nonce
self.request_id = request_id
self.request_type = request_type
@@ -622,8 +685,10 @@ class CertReviewResponse(CertEnrollmentRequest):
review_response.request_type = json_value['requestType']
review_response.request_status = json_value['requestStatus']
review_response.request_owner = json_value['requestOwner']
- review_response.request_creation_time = json_value['requestCreationTime']
- review_response.request_modification_time = json_value['requestModificationTime']
+ review_response.request_creation_time = \
+ json_value['requestCreationTime']
+ review_response.request_modification_time = \
+ json_value['requestModificationTime']
review_response.request_notes = json_value['requestNotes']
review_response.profile_approved_by = json_value['profileApprovedBy']
review_response.profile_set_id = json_value['profileSetId']
@@ -635,18 +700,20 @@ class CertReviewResponse(CertEnrollmentRequest):
profile_policy_sets = json_value['ProfilePolicySet']
if not isinstance(profile_policy_sets, types.ListType):
- review_response.policy_sets.append(profile.ProfilePolicySet.from_json(profile_policy_sets))
+ review_response.policy_sets.append(
+ profile.ProfilePolicySet.from_json(profile_policy_sets))
else:
for policy_set in profile_policy_sets:
- review_response.policy_sets.append(profile.ProfilePolicySet.from_json(policy_set))
+ review_response.policy_sets.append(
+ profile.ProfilePolicySet.from_json(policy_set))
return review_response
class CertClient(object):
"""
- Class encapsulating and mirroring the functionality in the CertResource Java interface class
- defining the REST API for Certificate resources.
+ Class encapsulating and mirroring the functionality in the CertResource
+ Java interface class defining the REST API for Certificate resources.
"""
def __init__(self, connection):
@@ -661,101 +728,122 @@ class CertClient(object):
self.enrollment_templates = {}
@pki.handle_exceptions()
- def get_cert(self, cert_id):
+ def get_cert(self, cert_serial_number):
""" Return a CertData object for a particular certificate. """
- if cert_id is None:
+ if cert_serial_number is None:
raise ValueError("Certificate ID must be specified")
- url = self.cert_url + '/' + str(cert_id)
+ url = self.cert_url + '/' + str(cert_serial_number)
r = self.connection.get(url, self.headers)
return CertData.from_json(r.json())
@pki.handle_exceptions()
- def list_certs(self, max_results=None, max_time=None, start=None, size=None, **cert_search_params):
- """ Return a CertDataInfoCollection object with a information about all the
- certificates that satisfy the search criteria.
+ def list_certs(self, max_results=None, max_time=None, start=None, size=None,
+ **cert_search_params):
+ """ Return a CertDataInfoCollection object with a information about all
+ the certificates that satisfy the search criteria.
If cert_search_request=None, returns all the certificates.
"""
url = self.cert_url + '/search'
- query_params = {"maxResults": max_results, "maxTime": max_time, "start": start, "size": size}
+ query_params = {"maxResults": max_results, "maxTime": max_time,
+ "start": start, "size": size}
cert_search_request = CertSearchRequest(**cert_search_params)
- search_request = json.dumps(cert_search_request, cls=encoder.CustomTypeEncoder, sort_keys=True)
- response = self.connection.post(url, search_request, self.headers, query_params)
+ search_request = json.dumps(cert_search_request,
+ cls=encoder.CustomTypeEncoder,
+ sort_keys=True)
+ response = self.connection.post(url, search_request, self.headers,
+ query_params)
return CertDataInfoCollection.from_json(response.json())
@pki.handle_exceptions()
- def review_cert(self, cert_id):
+ def review_cert(self, cert_serial_number):
""" Reviews a certificate. Returns a CertData object with a nonce.
- This method requires an agent's authentication cert in the connection object.
+ This method requires an agent's authentication cert in the
+ connection object.
"""
- if cert_id is None:
+ if cert_serial_number is None:
raise ValueError("Certificate ID must be specified")
- url = self.agent_cert_url + '/' + str(cert_id)
+ url = self.agent_cert_url + '/' + str(cert_serial_number)
r = self.connection.get(url, self.headers)
return CertData.from_json(r.json())
- def _submit_revoke_request(self, url, cert_id, revocation_reason=None, invalidity_date=None, comments=None,
- nonce=None):
+ def _submit_revoke_request(self, url, cert_serial_number,
+ revocation_reason=None, invalidity_date=None,
+ comments=None, nonce=None):
"""
Submits a certificate revocation request.
Expects the URL for submitting the request.
Creates a CertRevokeRequest object using the arguments provided.
- If nonce is passed as an argument, reviews the cert to get a nonce from the server
- and passes it in the request.
+ If nonce is passed as an argument, reviews the cert to get a nonce
+ from the server and passes it in the request.
Returns a CertRequestInfo object.
"""
- if cert_id is None:
+ if cert_serial_number is None:
raise ValueError("Certificate ID must be specified")
if url is None:
raise ValueError("URL not specified")
if nonce is None:
- cert_data = self.review_cert(cert_id)
+ cert_data = self.review_cert(cert_serial_number)
nonce = cert_data.nonce
- request = CertRevokeRequest(nonce, revocation_reason, invalidity_date, comments)
- revoke_request = json.dumps(request, cls=encoder.CustomTypeEncoder, sort_keys=True)
+ request = CertRevokeRequest(nonce, revocation_reason, invalidity_date,
+ comments)
+ revoke_request = json.dumps(request, cls=encoder.CustomTypeEncoder,
+ sort_keys=True)
r = self.connection.post(url, revoke_request, headers=self.headers)
return CertRequestInfo.from_json(r.json())
@pki.handle_exceptions()
- def revoke_cert(self, cert_id, revocation_reason=None, invalidity_date=None, comments=None, nonce=None):
+ def revoke_cert(self, cert_serial_number, revocation_reason=None,
+ invalidity_date=None, comments=None, nonce=None):
""" Revokes a certificate.
Returns a CertRequestInfo object with information about the request.
- This method requires an agent's authentication cert in the connection object.
+ This method requires an agent's authentication cert in the
+ connection object.
"""
- url = self.agent_cert_url + '/' + str(cert_id) + '/revoke'
- return self._submit_revoke_request(url, cert_id, revocation_reason, invalidity_date, comments, nonce)
+ url = self.agent_cert_url + '/' + str(cert_serial_number) + '/revoke'
+ return self._submit_revoke_request(url, cert_serial_number,
+ revocation_reason, invalidity_date,
+ comments, nonce)
@pki.handle_exceptions()
- def revoke_ca_cert(self, cert_id, revocation_reason=None, invalidity_date=None, comments=None, nonce=None):
+ def revoke_ca_cert(self, cert_serial_number, revocation_reason=None,
+ invalidity_date=None, comments=None, nonce=None):
""" Revokes a CA certificate.
Returns a CertRequestInfo object with information about the request.
- This method requires an agent's authentication cert in the connection object.
+ This method requires an agent's authentication cert in the
+ connection object.
"""
- url = self.agent_cert_url + '/' + str(cert_id) + '/revoke-ca'
- return self._submit_revoke_request(url, cert_id, revocation_reason, invalidity_date, comments, nonce)
+ url = self.agent_cert_url + '/' + str(cert_serial_number) + '/revoke-ca'
+ return self._submit_revoke_request(url, cert_serial_number,
+ revocation_reason, invalidity_date,
+ comments, nonce)
@pki.handle_exceptions()
- def hold_cert(self, cert_id, comments=None):
+ def hold_cert(self, cert_serial_number, comments=None):
""" Places a certificate on-hold.
- Calls the revoke_cert method with reason - CertRevokeRequest.REASON_CERTIFICATE_HOLD.
+ Calls the revoke_cert method with reason -
+ CertRevokeRequest.REASON_CERTIFICATE_HOLD.
Returns a CertRequestInfo object.
- This method requires an agent's authentication cert in the connection object.
+ This method requires an agent's authentication cert in the
+ connection object.
"""
- return self.revoke_cert(cert_id, CertRevokeRequest.REASON_CERTIFICATE_HOLD, comments=comments)
+ return self.revoke_cert(cert_serial_number, 'Certificate_Hold',
+ comments=comments)
@pki.handle_exceptions()
- def unrevoke_cert(self, cert_id):
+ def unrevoke_cert(self, cert_serial_number):
""" Un-revokes a revoked certificate.
Returns a CertRequestInfo object.
- This method requires an agent's authentication cert in the connection object.
+ This method requires an agent's authentication cert in the
+ connection object.
"""
- if cert_id is None:
+ if cert_serial_number is None:
raise ValueError("Certificate ID must be specified")
- url = self.agent_cert_url + '/' + str(cert_id) + '/unrevoke'
+ url = self.agent_cert_url + '/' + str(cert_serial_number) + '/unrevoke'
r = self.connection.post(url, None, headers=self.headers)
return CertRequestInfo.from_json(r.json())
@@ -774,8 +862,9 @@ class CertClient(object):
return CertRequestInfo.from_json(r.json())
@pki.handle_exceptions()
- def list_requests(self, request_status=None, request_type=None, from_request_id=None, size=None,
- max_results=None, max_time=None):
+ def list_requests(self, request_status=None, request_type=None,
+ from_request_id=None, size=None, max_results=None,
+ max_time=None):
"""
Query for a list of certificates using the arguments passed.
Returns a CertRequestInfoCollection object.
@@ -789,7 +878,8 @@ class CertClient(object):
'maxResults': max_results,
'maxTime': max_time
}
- r = self.connection.get(self.agent_cert_requests_url, self.headers, query_params)
+ r = self.connection.get(self.agent_cert_requests_url, self.headers,
+ query_params)
return CertRequestInfoCollection.from_json(r.json())
@pki.handle_exceptions()
@@ -819,15 +909,17 @@ class CertClient(object):
cert_review_response = self.review_request(request_id)
url = self.agent_cert_requests_url + '/' + request_id + '/' + action
- review_response = json.dumps(cert_review_response, cls=encoder.CustomTypeEncoder, sort_keys=True)
+ review_response = json.dumps(cert_review_response,
+ cls=encoder.CustomTypeEncoder,
+ sort_keys=True)
r = self.connection.post(url, review_response, headers=self.headers)
return r
def approve_request(self, request_id, cert_review_response=None):
"""
Approves a certificate enrollment request.
- If cert_review_response is None, a review request operation is performed to fetch the
- CertReviewResponse object.
+ If cert_review_response is None, a review request operation is performed
+ to fetch the CertReviewResponse object.
Requires as agent level authentication.
"""
return self._perform_action(request_id, cert_review_response, 'approve')
@@ -835,17 +927,17 @@ class CertClient(object):
def cancel_request(self, request_id, cert_review_response=None):
"""
Cancels a certificate enrollment request.
- If cert_review_response is None, a review request operation is performed to fetch the
- CertReviewResponse object.
+ If cert_review_response is None, a review request operation is performed
+ to fetch the CertReviewResponse object.
Requires as agent level authentication.
"""
return self._perform_action(request_id, cert_review_response, 'cancel')
- def reject_request(self, request_id, cert_review_response=None):
+ def reject_request(self, request_id, cert_review_response=None):
"""
Rejects a certificate enrollment request.
- If cert_review_response is None, a review request operation is performed to fetch the
- CertReviewResponse object.
+ If cert_review_response is None, a review request operation is performed
+ to fetch the CertReviewResponse object.
Requires as agent level authentication.
"""
return self._perform_action(request_id, cert_review_response, 'reject')
@@ -853,17 +945,18 @@ class CertClient(object):
def validate_request(self, request_id, cert_review_response):
"""
Validates a certificate enrollment request.
- If cert_review_response is None, a review request operation is performed to fetch the
- CertReviewResponse object.
+ If cert_review_response is None, a review request operation is performed
+ to fetch the CertReviewResponse object.
Requires as agent level authentication.
"""
- return self._perform_action(request_id, cert_review_response, 'validate')
+ return self._perform_action(request_id, cert_review_response,
+ 'validate')
def update_request(self, request_id, cert_review_response):
"""
Updates a certificate enrollment request.
- If cert_review_response is None, a review request operation is performed to fetch the
- CertReviewResponse object.
+ If cert_review_response is None, a review request operation is performed
+ to fetch the CertReviewResponse object.
Requires as agent level authentication.
"""
return self._perform_action(request_id, cert_review_response, 'update')
@@ -871,8 +964,8 @@ class CertClient(object):
def assign_request(self, request_id, cert_review_response):
"""
Assigns a certificate enrollment request.
- If cert_review_response is None, a review request operation is performed to fetch the
- CertReviewResponse object.
+ If cert_review_response is None, a review request operation is performed
+ to fetch the CertReviewResponse object.
Requires as agent level authentication.
"""
return self._perform_action(request_id, cert_review_response, 'assign')
@@ -880,17 +973,19 @@ class CertClient(object):
def unassign_request(self, request_id, cert_review_response):
"""
Un-assigns a certificate enrollment request.
- If cert_review_response is None, a review request operation is performed to fetch the
- CertReviewResponse object.
+ If cert_review_response is None, a review request operation is performed
+ to fetch the CertReviewResponse object.
Requires as agent level authentication.
"""
- return self._perform_action(request_id, cert_review_response, 'unassign')
+ return self._perform_action(request_id, cert_review_response,
+ 'unassign')
@pki.handle_exceptions()
def list_enrollment_templates(self, start=None, size=None):
"""
Gets the list of profile templates supported by the CA.
- The values for start and size arguments determine the starting point and the length of the list.
+ The values for start and size arguments determine the starting point and
+ the length of the list.
Returns a ProfileDataInfoCollection object.
"""
@@ -900,7 +995,6 @@ class CertClient(object):
'size': size
}
r = self.connection.get(url, self.headers, query_params)
- print r
return profile.ProfileDataInfoCollection.from_json(r.json())
@pki.handle_exceptions()
@@ -908,10 +1002,13 @@ class CertClient(object):
"""
Fetch the enrollment template for the given profile id.
For the first time, the request is sent to the server.
- The retrieved CertEnrollmentRequest object is then cached locally for future requests.
+ The retrieved CertEnrollmentRequest object is then cached locally for
+ future requests.
Returns a CerEnrollmentRequest object.
"""
+ if profile_id is None:
+ raise ValueError("Profile ID must be specified.")
if profile_id in self.enrollment_templates:
return copy.deepcopy(self.enrollment_templates[profile_id])
url = self.cert_requests_url + '/profiles/' + str(profile_id)
@@ -927,8 +1024,10 @@ class CertClient(object):
def create_enrollment_request(self, profile_id, inputs):
"""
Fetches the enrollment request object for the given profile and
- sets values to its attributes using the values provided in the inputs dictionary.
- Returns the CertEnrollmentRequest object, which can be submitted to enroll a certificate.
+ sets values to its attributes using the values provided in the inputs
+ dictionary.
+ Returns the CertEnrollmentRequest object, which can be submitted to
+ enroll a certificate.
"""
if inputs is None or len(inputs) == 0:
raise ValueError("No inputs provided.")
@@ -945,42 +1044,48 @@ class CertClient(object):
def submit_enrollment_request(self, enrollment_request):
"""
Submits the CertEnrollmentRequest object to the server.
- Returns a CertRequestInfoCollection object with information about the certificate requests
- enrolled at the CA.
+ Returns a CertRequestInfoCollection object with information about the
+ certificate requests enrolled at the CA.
"""
- request_object = json.dumps(enrollment_request, cls=encoder.CustomTypeEncoder, sort_keys=True)
- r = self.connection.post(self.cert_requests_url, request_object, self.headers)
+ request_object = json.dumps(enrollment_request,
+ cls=encoder.CustomTypeEncoder,
+ sort_keys=True)
+ r = self.connection.post(self.cert_requests_url, request_object,
+ self.headers)
return CertRequestInfoCollection.from_json(r.json())
@pki.handle_exceptions()
def enroll_cert(self, profile_id, inputs):
"""
A convenience method for enrolling a certificate for a given profile id.
- The inputs parameter should be a dictionary with values for the profile attributes
- for an enrollment request.
+ The inputs parameter should be a dictionary with values for the profile
+ attributes for an enrollment request.
- Calling this method with valid arguments, creates an enrollment request, submits it
- to the server, approves the certificate requests generated for the enrollment and
- returns a list of CertData objects for all the certificates generated as part of this
- enrollment.
+ Calling this method with valid arguments, creates an enrollment request,
+ submits it to the server, approves the certificate requests generated
+ for the enrollment and returns a list of CertData objects for all the
+ certificates generated as part of this enrollment.
- Note: This method supports only certificate enrollment where only one agent approval
- is sufficient.
+ Note: This method supports only certificate enrollment where only one
+ agent approval is sufficient.
Requires an agent level authentication.
+ Returns a list of CertData objects.
"""
- # Create a CertEnrollmentRequest object using the inputs for the given profile id.
+ # Create a CertEnrollmentRequest object using the inputs for the given
+ # profile id.
enroll_request = self.create_enrollment_request(profile_id, inputs)
# Submit the enrollment request
cert_request_infos = self.submit_enrollment_request(enroll_request)
# Approve the requests generated for the certificate enrollment.
- # Fetch the CertData objects for all the certificates created and return to the caller.
+ # Fetch the CertData objects for all the certificates created and
+ # return to the caller.
certificates = []
- for cert_request_info in cert_request_infos.cert_info_list:
+ for cert_request_info in cert_request_infos.cert_request_info_list:
request_id = cert_request_info.request_id
self.approve_request(request_id)
cert_id = self.get_request(request_id).cert_id
@@ -1010,7 +1115,8 @@ def main():
# Create a PKIConnection object that stores the details of the CA.
connection = client.PKIConnection('https', 'localhost', '8443', 'ca')
- # The pem file used for authentication. Created from a p12 file using the command -
+ # The pem file used for authentication. Created from a p12 file using the
+ # command -
# openssl pkcs12 -in <p12_file_path> -out /tmp/auth.pem -nodes
connection.set_authentication_cert("/tmp/auth.pem")
@@ -1025,13 +1131,19 @@ def main():
inputs = dict()
inputs['cert_request_type'] = 'crmf'
- inputs['cert_request'] = "MIIBpDCCAaAwggEGAgUA5n9VYTCBx4ABAqUOMAwxCjAIBgNVBAMTAXimgZ8wDQYJKoZIhvcNAQEBBQAD" \
- "gY0AMIGJAoGBAK/SmUVoUjBtqHNw/e3OoCSXw42pdQSR53/eYJWpf7nyTbZ9UuIhGfXOtxy5vRetmDHE" \
- "9u0AopmuJbr1rL17/tSnDakpkE9umQ2lMOReLloSdX32w2xOeulUwh5BGbFpq10S0SvW1H93Vn0eCy2a" \
- "a4UtILNEsp7JJ3FnYJibfuMPAgMBAAGpEDAOBgNVHQ8BAf8EBAMCBeAwMzAVBgkrBgEFBQcFAQEMCHJl" \
- "Z1Rva2VuMBoGCSsGAQUFBwUBAgwNYXV0aGVudGljYXRvcqGBkzANBgkqhkiG9w0BAQUFAAOBgQCuywnr" \
- "Dk/wGwfbguw9oVs9gzFQwM4zeFbk+z82G5CWoG/4mVOT5LPL5Q8iF+KfnaU9Qcu6zZPxW6ZmDd8WpPJ+" \
- "MTPyQl3Q5BfiKa4l5ra1NeqxMOlMiiupwINmm7jd1KaA2eIjuyC8/gTaO4b14R6aRaOj+Scp9cNYbthA7REhJw=="
+ inputs['cert_request'] = "MIIBpDCCAaAwggEGAgUA5n9VYTCBx4ABAqUOMAwxCjAIBgN" \
+ "VBAMTAXimgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAK" \
+ "/SmUVoUjBtqHNw/e3OoCSXw42pdQSR53/eYJWpf7nyTbZ9U" \
+ "uIhGfXOtxy5vRetmDHE9u0AopmuJbr1rL17/tSnDakpkE9u" \
+ "mQ2lMOReLloSdX32w2xOeulUwh5BGbFpq10S0SvW1H93Vn0" \
+ "eCy2aa4UtILNEsp7JJ3FnYJibfuMPAgMBAAGpEDAOBgNVHQ" \
+ "8BAf8EBAMCBeAwMzAVBgkrBgEFBQcFAQEMCHJlZ1Rva2VuM" \
+ "BoGCSsGAQUFBwUBAgwNYXV0aGVudGljYXRvcqGBkzANBgkq" \
+ "hkiG9w0BAQUFAAOBgQCuywnrDk/wGwfbguw9oVs9gzFQwM4" \
+ "zeFbk+z82G5CWoG/4mVOT5LPL5Q8iF+KfnaU9Qcu6zZPxW6" \
+ "ZmDd8WpPJ+MTPyQl3Q5BfiKa4l5ra1NeqxMOlMiiupwINmm" \
+ "7jd1KaA2eIjuyC8/gTaO4b14R6aRaOj+Scp9cNYbthA7REh" \
+ "Jw=="
inputs['sn_uid'] = 'test12345'
inputs['sn_e'] = 'example@redhat.com'
inputs['sn_cn'] = 'TestUser'
@@ -1053,13 +1165,18 @@ def main():
inputs = dict()
inputs['cert_request_type'] = 'pkcs10'
- inputs['cert_request'] = "MIIBmDCCAQECAQAwWDELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAk5DMRAwDgYDVQQHDAdSYWxlaWdoMRUwE" \
- "wYDVQQKDAxSZWQgSGF0IEluYy4xEzARBgNVBAMMClRlc3RTZXJ2ZXIwgZ8wDQYJKoZIhvcNAQEBBQADgY" \
- "0AMIGJAoGBAMJpWz92dSYCvWxllrQCY5atPKCswUwyppRNGPnKmJ77AdHBBI4dFyET+h/+69jQMTLZMa8" \
- "FX7SbyHvgbgLBP4Q/RzCSE2S87qFNjriOqiQCqJmcrzDzdncJQiP+O7T6MSpLo3smLP7dK1Vd7vK0Vy8y" \
- "HwV0eBx7DgYedv2slBPHAgMBAAGgADANBgkqhkiG9w0BAQUFAAOBgQBvkxAGKwkfK3TKwLc5Mg0IWp8zG" \
- "RVwxdIlghAL8DugNocCNNgmZazglJOOehLuk0/NkLX1ZM5RrVgM09W6kcfWZtIwr5Uje2K/+6tW2ZTGrb" \
- "izs7CNOTMzA/9H8CkHb4H9P/qRT275zHIocYj4smUnXLwWGsBMeGs+OMMbGvSrHg=="
+ inputs['cert_request'] = "MIIBmDCCAQECAQAwWDELMAkGA1UEBhMCVVMxCzAJBgNVBAg" \
+ "MAk5DMRAwDgYDVQQHDAdSYWxlaWdoMRUwEwYDVQQKDAxSZW" \
+ "QgSGF0IEluYy4xEzARBgNVBAMMClRlc3RTZXJ2ZXIwgZ8wD" \
+ "QYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAMJpWz92dSYCvWxl" \
+ "lrQCY5atPKCswUwyppRNGPnKmJ77AdHBBI4dFyET+h/+69j" \
+ "QMTLZMa8FX7SbyHvgbgLBP4Q/RzCSE2S87qFNjriOqiQCqJ" \
+ "mcrzDzdncJQiP+O7T6MSpLo3smLP7dK1Vd7vK0Vy8yHwV0e" \
+ "Bx7DgYedv2slBPHAgMBAAGgADANBgkqhkiG9w0BAQUFAAOB" \
+ "gQBvkxAGKwkfK3TKwLc5Mg0IWp8zGRVwxdIlghAL8DugNoc" \
+ "CNNgmZazglJOOehLuk0/NkLX1ZM5RrVgM09W6kcfWZtIwr5" \
+ "Uje2K/+6tW2ZTGrbizs7CNOTMzA/9H8CkHb4H9P/qRT275z" \
+ "HIocYj4smUnXLwWGsBMeGs+OMMbGvSrHg=="
inputs['requestor_name'] = 'Tester'
inputs['requestor_email'] = 'example@redhat.com'
@@ -1080,8 +1197,8 @@ def main():
search_params = {'status': 'VALID'}
cert_data_list = cert_client.list_certs(**search_params)
- for cert_data_info in cert_data_list.cert_info_list:
- print("Serial Number: " + cert_data_info.cert_id)
+ for cert_data_info in cert_data_list:
+ print("Serial Number: " + cert_data_info.serial_number)
print("Subject DN: " + cert_data_info.subject_dn)
print("Status: " + cert_data_info.status)
print
@@ -1099,7 +1216,8 @@ def main():
# Certificate Serial Number used for CertClient methods.
# 7, 0x7 and '0x7' are also valid values
- # Following examples use the serial number of the user certificate enrolled before.
+ # Following examples use the serial number of the user certificate enrolled
+ # before.
cert_id = cert_data_infos[0].serial_number
#Get certificate data
@@ -1136,9 +1254,8 @@ def main():
print('Revoking a certificate')
print('----------------------')
- cert_request_info = cert_client.revoke_cert(cert_data.serial_number,
- revocation_reason=CertRevokeRequest.REASON_CERTIFICATE_HOLD,
- comments="Test revoking a cert", nonce=cert_data.nonce)
+ cert_request_info = cert_client.hold_cert(cert_data.serial_number,
+ comments="Test revoking a cert")
print('Request ID: ' + cert_request_info.request_id)
print('Request Type: ' + cert_request_info.request_type)
print('Request Status: ' + cert_request_info.request_status)