summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAbhishek Koneru <akoneru@redhat.com>2014-05-23 12:17:38 -0400
committerAbhishek Koneru <akoneru@redhat.com>2014-06-03 02:39:12 -0400
commit5b7c76832dc72c85d9dd9db16f861f3283fa7eb0 (patch)
tree9d721618324da863e9ac635ae5779225c8d33aeb
parent1d772fad871e4d0e0500f266e99d17973b4c3dca (diff)
downloadpki-5b7c76832dc72c85d9dd9db16f861f3283fa7eb0.tar.gz
pki-5b7c76832dc72c85d9dd9db16f861f3283fa7eb0.tar.xz
pki-5b7c76832dc72c85d9dd9db16f861f3283fa7eb0.zip
Addressed comments given for patches 92-2, 93, 94.
Addressed review comments for the patches that implement the CertClient and a part of ProfileClient. Also includes the pycharm project files in pki/.idea.
-rw-r--r--base/common/python/pki/__init__.py6
-rw-r--r--base/common/python/pki/account.py3
-rw-r--r--base/common/python/pki/cert.py493
-rw-r--r--base/common/python/pki/profile.py145
4 files changed, 417 insertions, 230 deletions
diff --git a/base/common/python/pki/__init__.py b/base/common/python/pki/__init__.py
index 713f10e0e..891d6ea63 100644
--- a/base/common/python/pki/__init__.py
+++ b/base/common/python/pki/__init__.py
@@ -21,6 +21,7 @@
"""
This module contains top-level classes and functions used by the Dogtag project.
"""
+from functools import wraps
import os
import re
import requests
@@ -245,6 +246,7 @@ def handle_exceptions():
def exceptions_decorator(fn_call):
""" The actual decorator handler."""
+ @wraps(fn_call)
def handler(inst, *args, **kwargs):
""" Decorator to catch and re-throw PKIExceptions."""
try:
@@ -392,8 +394,10 @@ class PropertyFile(object):
class Link:
"""
- Stores the information of the resteasy's Link object sent by the server for a resource.
+ Stores the information of the resteasy's Link object sent by the server
+ for a resource.
"""
+
def __init__(self):
pass
diff --git a/base/common/python/pki/account.py b/base/common/python/pki/account.py
index 1ab5b2ddb..0916ec7cc 100644
--- a/base/common/python/pki/account.py
+++ b/base/common/python/pki/account.py
@@ -18,6 +18,7 @@
# Copyright (C) 2013 Red Hat, Inc.
# All rights reserved.
#
+import pki
class AccountClient:
@@ -25,8 +26,10 @@ class AccountClient:
def __init__(self, connection):
self.connection = connection
+ @pki.handle_exceptions()
def login(self):
self.connection.get('/rest/account/login')
+ @pki.handle_exceptions()
def logout(self):
self.connection.get('/rest/account/logout')
diff --git a/base/common/python/pki/cert.py b/base/common/python/pki/cert.py
index b22307ad1..036bbf4e3 100644
--- a/base/common/python/pki/cert.py
+++ b/base/common/python/pki/cert.py
@@ -14,16 +14,6 @@ import pki.encoder as encoder
import pki.profile as profile
-class CertId(object):
- """
- Class encapsulating a certificate serial number
- """
-
- def __init__(self, cert_id):
- """ Constructor """
- self.value = cert_id
-
-
class CertData(object):
"""
Class containing certificate data as returned from getCert()
@@ -43,6 +33,16 @@ class CertData(object):
self.nonce = None
self.link = None
+ def __repr__(self):
+ attributes = {
+ "CertData": {
+ "serial_number": self.serial_number,
+ "subject_dn": self.subject_dn,
+ "status": self.status
+ }
+ }
+ return str(attributes)
+
@classmethod
def from_json(cls, attr_list):
""" Return CertData object from JSON dict """
@@ -72,7 +72,7 @@ class CertDataInfo(object):
def __init__(self):
""" Constructor """
- self.cert_id = None
+ self.serial_number = None
self.subject_dn = None
self.status = None
self.type = None
@@ -85,11 +85,21 @@ class CertDataInfo(object):
self.issued_by = None
self.link = None
+ def __repr__(self):
+ obj = {
+ "CertDataInfo": {
+ 'serial_number': self.serial_number,
+ 'subject_dn': self.subject_dn,
+ 'type': self.type,
+ 'status': self.status
+ }}
+ return str(obj)
+
@classmethod
def from_json(cls, attr_list):
""" Return CertDataInfo object from JSON dict """
cert_data_info = cls()
- cert_data_info.cert_id = attr_list['id']
+ cert_data_info.serial_number = attr_list['id']
cert_data_info.subject_dn = attr_list['SubjectDN']
cert_data_info.status = attr_list['Status']
cert_data_info.type = attr_list['Type']
@@ -107,25 +117,30 @@ class CertDataInfo(object):
class CertDataInfoCollection(object):
"""
- Class containing list of CertDataInfo objects and their respective link objects.
+ Class containing list of CertDataInfo objects and their respective link
+ objects.
This data is returned when searching/listing certificate records in the CA.
"""
def __init__(self):
""" Constructor """
- self.cert_info_list = []
+ self.cert_data_info_list = []
self.links = []
+ def __iter__(self):
+ return iter(self.cert_data_info_list)
+
@classmethod
def from_json(cls, json_value):
""" Populate object from JSON input """
ret = cls()
cert_infos = json_value['entries']
if not isinstance(cert_infos, types.ListType):
- ret.cert_info_list.append(CertDataInfo.from_json(cert_infos))
+ ret.cert_data_info_list.append(CertDataInfo.from_json(cert_infos))
else:
for cert_info in cert_infos:
- ret.cert_info_list.append(CertDataInfo.from_json(cert_info))
+ ret.cert_data_info_list.append(
+ CertDataInfo.from_json(cert_info))
links = json_value['Link']
if not isinstance(links, types.ListType):
@@ -155,6 +170,17 @@ class CertRequestInfo(object):
self.cert_url = None
self.error_message = None
+ def __repr__(self):
+ obj = {
+ 'CertRequestInfo': {
+ 'request_id': self.request_id,
+ 'request_type': self.request_type,
+ 'request_status': self.request_status,
+ 'request_url': self.request_url
+ }
+ }
+ return str(obj)
+
@classmethod
def from_json(cls, attr_list):
cert_request_info = cls()
@@ -163,7 +189,8 @@ class CertRequestInfo(object):
cert_request_info.request_status = attr_list['requestStatus']
cert_request_info.operation_result = attr_list['operationResult']
cert_request_info.request_id = \
- str(cert_request_info.request_url)[(str(cert_request_info.request_url).rfind("/") + 1):]
+ str(cert_request_info.request_url)[(str(
+ cert_request_info.request_url).rfind("/") + 1):]
#Optional parameters
if 'certId' in attr_list:
cert_request_info.cert_id = attr_list['certId']
@@ -184,19 +211,24 @@ class CertRequestInfoCollection(object):
"""
def __init__(self):
- self.cert_info_list = []
+ self.cert_request_info_list = []
self.links = []
+ def __iter__(self):
+ return iter(self.cert_request_info_list)
+
@classmethod
def from_json(cls, json_value):
""" Populate object from JSON input """
ret = cls()
cert_req_infos = json_value['entries']
if not isinstance(cert_req_infos, types.ListType):
- ret.cert_info_list.append(CertRequestInfo.from_json(cert_req_infos))
+ ret.cert_request_info_list.append(
+ CertRequestInfo.from_json(cert_req_infos))
else:
for cert_info in cert_req_infos:
- ret.cert_info_list.append(CertRequestInfo.from_json(cert_info))
+ ret.cert_request_info_list.append(
+ CertRequestInfo.from_json(cert_info))
links = json_value['Link']
if not isinstance(links, types.ListType):
@@ -215,18 +247,28 @@ class CertSearchRequest(object):
"""
search_params = {'serial_to': 'serialTo', 'serial_from': 'serialFrom',
- 'email': 'eMail', 'common_name': 'commonName', 'user_id': 'userID',
- 'org_unit': 'orgUnit', 'org': 'org', 'locality': 'locality',
- 'state': 'state', 'country': 'country', 'match_exactly': 'matchExactly',
- 'status': 'status', 'revoked_by': 'revokedBy', 'revoked_on_from': 'revokedOnFrom',
- 'revoked_on_to': 'revokedOnTo', 'revocation_reason': 'revocationReason',
- 'issued_by': 'issuedBy', 'issued_on_from': 'issuedOnFrom', 'issued_on_to': 'issuedOnTo',
- 'valid_not_before_from': 'validNotBeforeFrom', 'valid_not_before_to': 'validNotBeforeTo',
- 'valid_not_after_from': 'validNotAfterFrom', 'valid_not_after_to': 'validNotAfterTo',
- 'validity_operation': 'validityOperation', 'validity_count': 'validityCount',
- 'validity_unit': 'validityUnit', 'cert_type_sub_email_ca': 'certTypeSubEmailCA',
- 'cert_type_sub_ssl_ca': 'certTypeSubSSLCA', 'cert_type_secure_email': 'certTypeSecureEmail',
- 'cert_type_ssl_client': 'certTypeSSLClient', 'cert_type_ssl_server': 'certTypeSSLServer'}
+ 'email': 'eMail', 'common_name': 'commonName',
+ 'user_id': 'userID', 'org_unit': 'orgUnit', 'org': 'org',
+ 'locality': 'locality', 'state': 'state',
+ 'country': 'country', 'match_exactly': 'matchExactly',
+ 'status': 'status', 'revoked_by': 'revokedBy',
+ 'revoked_on_from': 'revokedOnFrom',
+ 'revoked_on_to': 'revokedOnTo',
+ 'revocation_reason': 'revocationReason',
+ 'issued_by': 'issuedBy', 'issued_on_from': 'issuedOnFrom',
+ 'issued_on_to': 'issuedOnTo',
+ 'valid_not_before_from': 'validNotBeforeFrom',
+ 'valid_not_before_to': 'validNotBeforeTo',
+ 'valid_not_after_from': 'validNotAfterFrom',
+ 'valid_not_after_to': 'validNotAfterTo',
+ 'validity_operation': 'validityOperation',
+ 'validity_count': 'validityCount',
+ 'validity_unit': 'validityUnit',
+ 'cert_type_sub_email_ca': 'certTypeSubEmailCA',
+ 'cert_type_sub_ssl_ca': 'certTypeSubSSLCA',
+ 'cert_type_secure_email': 'certTypeSecureEmail',
+ 'cert_type_ssl_client': 'certTypeSSLClient',
+ 'cert_type_ssl_server': 'certTypeSSLServer'}
def __init__(self, **cert_search_params):
""" Constructor """
@@ -234,59 +276,64 @@ class CertSearchRequest(object):
if len(cert_search_params) == 0:
setattr(self, 'serialNumberRangeInUse', True)
- for param in cert_search_params:
+ for param, value in cert_search_params.viewitems():
if not param in CertSearchRequest.search_params:
raise ValueError('Invalid search parameter: ' + param)
- if param == 'serial_to' or param == 'serial_from':
- setattr(self, CertSearchRequest.search_params[param], cert_search_params[param])
+ if param in {'serial_to', 'serial_from'}:
+ setattr(self, CertSearchRequest.search_params[param], value)
setattr(self, 'serialNumberRangeInUse', True)
- if param == 'email' or param == 'common_name' or param == 'user_id' or param == 'org_unit' \
- or param == 'org' or param == 'locality' or param == 'state' or param == 'country' \
- or param == 'match_exactly':
- setattr(self, CertSearchRequest.search_params[param], cert_search_params[param])
+ if param in {
+ 'email', 'common_name', 'user_id', 'org_unit', 'org',
+ 'locality', 'state', 'country', 'match_exactly'
+ }:
+ setattr(self, CertSearchRequest.search_params[param], value)
setattr(self, 'subjectInUse', True)
if param == 'status':
- setattr(self, CertSearchRequest.search_params[param], cert_search_params[param])
+ setattr(self, CertSearchRequest.search_params[param], value)
if param == 'revoked_by':
- setattr(self, CertSearchRequest.search_params[param], cert_search_params[param])
+ setattr(self, CertSearchRequest.search_params[param], value)
setattr(self, 'revokedByInUse', True)
- if param == 'revoked_on_from' or param == 'revoked_on_to':
- setattr(self, CertSearchRequest.search_params[param], cert_search_params[param])
+ if param in {'revoked_on_from', 'revoked_on_to'}:
+ setattr(self, CertSearchRequest.search_params[param], value)
setattr(self, 'revokedOnInUse', True)
if param == 'revocation_reason':
- setattr(self, CertSearchRequest.search_params[param], cert_search_params[param])
+ setattr(self, CertSearchRequest.search_params[param], value)
setattr(self, 'revocationReasonInUse', True)
if param == 'issued_by':
- setattr(self, CertSearchRequest.search_params[param], cert_search_params[param])
+ setattr(self, CertSearchRequest.search_params[param], value)
setattr(self, 'issuedByInUse', True)
- if param == 'issued_on_from' or param == 'issued_on_to':
- setattr(self, CertSearchRequest.search_params[param], cert_search_params[param])
+ if param in {'issued_on_from', 'issued_on_to'}:
+ setattr(self, CertSearchRequest.search_params[param], value)
setattr(self, 'issuedOnInUse', True)
- if param == 'valid_not_before_from' or param == 'valid_not_before_to':
- setattr(self, CertSearchRequest.search_params[param], cert_search_params[param])
+ if param in {'valid_not_before_from', 'valid_not_before_to'}:
+ setattr(self, CertSearchRequest.search_params[param], value)
setattr(self, 'validNotBeforeInUse', True)
- if param == 'valid_not_after_from' or param == 'valid_not_after_to':
- setattr(self, CertSearchRequest.search_params[param], cert_search_params[param])
+ if param in {'valid_not_after_from', 'valid_not_after_to'}:
+ setattr(self, CertSearchRequest.search_params[param], value)
setattr(self, 'validNotAfterInUse', True)
- if param == 'validity_operation' or param == 'validity_count' or param == 'validity_unit':
- setattr(self, CertSearchRequest.search_params[param], cert_search_params[param])
+ if param in {
+ 'validity_operation', 'validity_count', 'validity_unit'
+ }:
+ setattr(self, CertSearchRequest.search_params[param], value)
setattr(self, 'validityLengthInUse', True)
- if param == 'cert_type_sub_email_ca' or param == 'cert_type_sub_ssl_ca' \
- or param == 'cert_type_secure_email' or param == 'cert_type_ssl_client' \
- or param == 'cert_type_ssl_server':
- setattr(self, CertSearchRequest.search_params[param], cert_search_params[param])
+ if param in {
+ 'cert_type_sub_email_ca', 'cert_type_sub_ssl_ca',
+ 'cert_type_secure_email', 'cert_type_ssl_client',
+ 'cert_type_ssl_server'
+ }:
+ setattr(self, CertSearchRequest.search_params[param], value)
setattr(self, 'certTypeInUse', True)
@@ -294,24 +341,28 @@ class CertRevokeRequest(object):
"""
An object of this class encapsulates all the
parameters required for revoking a certificate.
- """
- REASON_UNSPECIFIED = "Unspecified"
- REASON_KEY_COMPROMISE = "Key_Compromise"
- REASON_CA_COMPROMISE = "CA_Compromise"
- REASON_AFFILIATION_CHANGED = "Affiliation_Changed"
- REASON_SUPERSEDED = "Superseded"
- REASON_CESSATION_OF_OPERATION = "Cessation_of_Operation"
- REASON_CERTIFICATE_HOLD = "Certificate_Hold"
- REASON_REMOVE_FROM_CRL = "Remove_from_CRL"
- REASON_PRIVILEGE_WITHDRAWN = "Privilege_Withdrawn"
- REASON_AA_COMPROMISE = "AA_Compromise"
+ Valid values for reasons for revoking a request are:
+ 'Unspecified', 'Key_Compromise', 'CA_Compromise',
+ 'Affiliation_Changed', 'Superseded', 'Cessation_of_Operation',
+ 'Certificate_Hold', 'Remove_from_CRL', 'Privilege_Withdrawn',
+ 'AA_Compromise'
+ """
+ reasons = ['Unspecified', 'Key_Compromise', 'CA_Compromise',
+ 'Affiliation_Changed', 'Superseded', 'Cessation_of_Operation',
+ 'Certificate_Hold', 'Remove_from_CRL', 'Privilege_Withdrawn',
+ 'AA_Compromise']
def __init__(self, nonce, reason=None, invalidity_date=None, comments=None):
""" Constructor """
+
setattr(self, "Nonce", nonce)
+
if reason is None:
- reason = self.REASON_UNSPECIFIED
+ reason = 'Unspecified'
+ else:
+ if reason not in CertRevokeRequest.reasons:
+ raise ValueError('Invalid revocation reason specified.')
setattr(self, "Reason", reason)
if invalidity_date is not None:
setattr(self, "InvalidityDate", invalidity_date)
@@ -321,11 +372,13 @@ class CertRevokeRequest(object):
class CertEnrollmentRequest(object):
"""
- This class encapsulates the parameters required for a certificate enrollment request.
+ This class encapsulates the parameters required for a certificate
+ enrollment request.
"""
- def __init__(self, profile_id=None, renewal=False, serial_number=None, remote_host=None, remote_address=None,
- inputs=None, outputs=None):
+ def __init__(self, profile_id=None, renewal=False, serial_number=None,
+ remote_host=None, remote_address=None, inputs=None,
+ outputs=None):
""" Constructor """
self.profile_id = profile_id
self.renewal = renewal
@@ -443,14 +496,17 @@ class CertEnrollmentRequest(object):
enroll_request.inputs.append(profile.ProfileInput.from_json(inputs))
else:
for profile_input in inputs:
- enroll_request.inputs.append(profile.ProfileInput.from_json(profile_input))
+ enroll_request.inputs.append(
+ profile.ProfileInput.from_json(profile_input))
outputs = json_value['Output']
if not isinstance(outputs, types.ListType):
- enroll_request.outputs.append(profile.ProfileOutput.from_json(outputs))
+ enroll_request.outputs.append(
+ profile.ProfileOutput.from_json(outputs))
else:
for profile_output in outputs:
- enroll_request.outputs.append(profile.ProfileOutput.from_json(profile_output))
+ enroll_request.outputs.append(
+ profile.ProfileOutput.from_json(profile_output))
return enroll_request
@@ -462,14 +518,21 @@ class CertReviewResponse(CertEnrollmentRequest):
It contains a nonce required to perform action on the request.
"""
- def __init__(self, profile_id=None, renewal=False, serial_number=None, remote_host=None, remote_address=None,
- inputs=None, outputs=None, nonce=None, request_id=None, request_type=None, request_status=None,
- request_owner=None, request_creation_time=None, request_modification_time=None, request_notes=None,
- profile_approval_by=None, profile_set_id=None, profile_is_visible=None, profile_name=None,
- profile_description=None, profile_remote_host=None, profile_remote_address=None, policy_sets=None):
-
- super(CertReviewResponse, self).__init__(profile_id, renewal, serial_number, remote_host,
- remote_address, inputs, outputs)
+ def __init__(self, profile_id=None, renewal=False, serial_number=None,
+ remote_host=None, remote_address=None, inputs=None,
+ outputs=None, nonce=None, request_id=None, request_type=None,
+ request_status=None, request_owner=None,
+ request_creation_time=None, request_modification_time=None,
+ request_notes=None, profile_approval_by=None,
+ profile_set_id=None, profile_is_visible=None,
+ profile_name=None, profile_description=None,
+ profile_remote_host=None, profile_remote_address=None,
+ policy_sets=None):
+
+ super(CertReviewResponse, self).__init__(profile_id, renewal,
+ serial_number, remote_host,
+ remote_address, inputs,
+ outputs)
self.nonce = nonce
self.request_id = request_id
self.request_type = request_type
@@ -622,8 +685,10 @@ class CertReviewResponse(CertEnrollmentRequest):
review_response.request_type = json_value['requestType']
review_response.request_status = json_value['requestStatus']
review_response.request_owner = json_value['requestOwner']
- review_response.request_creation_time = json_value['requestCreationTime']
- review_response.request_modification_time = json_value['requestModificationTime']
+ review_response.request_creation_time = \
+ json_value['requestCreationTime']
+ review_response.request_modification_time = \
+ json_value['requestModificationTime']
review_response.request_notes = json_value['requestNotes']
review_response.profile_approved_by = json_value['profileApprovedBy']
review_response.profile_set_id = json_value['profileSetId']
@@ -635,18 +700,20 @@ class CertReviewResponse(CertEnrollmentRequest):
profile_policy_sets = json_value['ProfilePolicySet']
if not isinstance(profile_policy_sets, types.ListType):
- review_response.policy_sets.append(profile.ProfilePolicySet.from_json(profile_policy_sets))
+ review_response.policy_sets.append(
+ profile.ProfilePolicySet.from_json(profile_policy_sets))
else:
for policy_set in profile_policy_sets:
- review_response.policy_sets.append(profile.ProfilePolicySet.from_json(policy_set))
+ review_response.policy_sets.append(
+ profile.ProfilePolicySet.from_json(policy_set))
return review_response
class CertClient(object):
"""
- Class encapsulating and mirroring the functionality in the CertResource Java interface class
- defining the REST API for Certificate resources.
+ Class encapsulating and mirroring the functionality in the CertResource
+ Java interface class defining the REST API for Certificate resources.
"""
def __init__(self, connection):
@@ -661,101 +728,122 @@ class CertClient(object):
self.enrollment_templates = {}
@pki.handle_exceptions()
- def get_cert(self, cert_id):
+ def get_cert(self, cert_serial_number):
""" Return a CertData object for a particular certificate. """
- if cert_id is None:
+ if cert_serial_number is None:
raise ValueError("Certificate ID must be specified")
- url = self.cert_url + '/' + str(cert_id)
+ url = self.cert_url + '/' + str(cert_serial_number)
r = self.connection.get(url, self.headers)
return CertData.from_json(r.json())
@pki.handle_exceptions()
- def list_certs(self, max_results=None, max_time=None, start=None, size=None, **cert_search_params):
- """ Return a CertDataInfoCollection object with a information about all the
- certificates that satisfy the search criteria.
+ def list_certs(self, max_results=None, max_time=None, start=None, size=None,
+ **cert_search_params):
+ """ Return a CertDataInfoCollection object with a information about all
+ the certificates that satisfy the search criteria.
If cert_search_request=None, returns all the certificates.
"""
url = self.cert_url + '/search'
- query_params = {"maxResults": max_results, "maxTime": max_time, "start": start, "size": size}
+ query_params = {"maxResults": max_results, "maxTime": max_time,
+ "start": start, "size": size}
cert_search_request = CertSearchRequest(**cert_search_params)
- search_request = json.dumps(cert_search_request, cls=encoder.CustomTypeEncoder, sort_keys=True)
- response = self.connection.post(url, search_request, self.headers, query_params)
+ search_request = json.dumps(cert_search_request,
+ cls=encoder.CustomTypeEncoder,
+ sort_keys=True)
+ response = self.connection.post(url, search_request, self.headers,
+ query_params)
return CertDataInfoCollection.from_json(response.json())
@pki.handle_exceptions()
- def review_cert(self, cert_id):
+ def review_cert(self, cert_serial_number):
""" Reviews a certificate. Returns a CertData object with a nonce.
- This method requires an agent's authentication cert in the connection object.
+ This method requires an agent's authentication cert in the
+ connection object.
"""
- if cert_id is None:
+ if cert_serial_number is None:
raise ValueError("Certificate ID must be specified")
- url = self.agent_cert_url + '/' + str(cert_id)
+ url = self.agent_cert_url + '/' + str(cert_serial_number)
r = self.connection.get(url, self.headers)
return CertData.from_json(r.json())
- def _submit_revoke_request(self, url, cert_id, revocation_reason=None, invalidity_date=None, comments=None,
- nonce=None):
+ def _submit_revoke_request(self, url, cert_serial_number,
+ revocation_reason=None, invalidity_date=None,
+ comments=None, nonce=None):
"""
Submits a certificate revocation request.
Expects the URL for submitting the request.
Creates a CertRevokeRequest object using the arguments provided.
- If nonce is passed as an argument, reviews the cert to get a nonce from the server
- and passes it in the request.
+ If nonce is passed as an argument, reviews the cert to get a nonce
+ from the server and passes it in the request.
Returns a CertRequestInfo object.
"""
- if cert_id is None:
+ if cert_serial_number is None:
raise ValueError("Certificate ID must be specified")
if url is None:
raise ValueError("URL not specified")
if nonce is None:
- cert_data = self.review_cert(cert_id)
+ cert_data = self.review_cert(cert_serial_number)
nonce = cert_data.nonce
- request = CertRevokeRequest(nonce, revocation_reason, invalidity_date, comments)
- revoke_request = json.dumps(request, cls=encoder.CustomTypeEncoder, sort_keys=True)
+ request = CertRevokeRequest(nonce, revocation_reason, invalidity_date,
+ comments)
+ revoke_request = json.dumps(request, cls=encoder.CustomTypeEncoder,
+ sort_keys=True)
r = self.connection.post(url, revoke_request, headers=self.headers)
return CertRequestInfo.from_json(r.json())
@pki.handle_exceptions()
- def revoke_cert(self, cert_id, revocation_reason=None, invalidity_date=None, comments=None, nonce=None):
+ def revoke_cert(self, cert_serial_number, revocation_reason=None,
+ invalidity_date=None, comments=None, nonce=None):
""" Revokes a certificate.
Returns a CertRequestInfo object with information about the request.
- This method requires an agent's authentication cert in the connection object.
+ This method requires an agent's authentication cert in the
+ connection object.
"""
- url = self.agent_cert_url + '/' + str(cert_id) + '/revoke'
- return self._submit_revoke_request(url, cert_id, revocation_reason, invalidity_date, comments, nonce)
+ url = self.agent_cert_url + '/' + str(cert_serial_number) + '/revoke'
+ return self._submit_revoke_request(url, cert_serial_number,
+ revocation_reason, invalidity_date,
+ comments, nonce)
@pki.handle_exceptions()
- def revoke_ca_cert(self, cert_id, revocation_reason=None, invalidity_date=None, comments=None, nonce=None):
+ def revoke_ca_cert(self, cert_serial_number, revocation_reason=None,
+ invalidity_date=None, comments=None, nonce=None):
""" Revokes a CA certificate.
Returns a CertRequestInfo object with information about the request.
- This method requires an agent's authentication cert in the connection object.
+ This method requires an agent's authentication cert in the
+ connection object.
"""
- url = self.agent_cert_url + '/' + str(cert_id) + '/revoke-ca'
- return self._submit_revoke_request(url, cert_id, revocation_reason, invalidity_date, comments, nonce)
+ url = self.agent_cert_url + '/' + str(cert_serial_number) + '/revoke-ca'
+ return self._submit_revoke_request(url, cert_serial_number,
+ revocation_reason, invalidity_date,
+ comments, nonce)
@pki.handle_exceptions()
- def hold_cert(self, cert_id, comments=None):
+ def hold_cert(self, cert_serial_number, comments=None):
""" Places a certificate on-hold.
- Calls the revoke_cert method with reason - CertRevokeRequest.REASON_CERTIFICATE_HOLD.
+ Calls the revoke_cert method with reason -
+ CertRevokeRequest.REASON_CERTIFICATE_HOLD.
Returns a CertRequestInfo object.
- This method requires an agent's authentication cert in the connection object.
+ This method requires an agent's authentication cert in the
+ connection object.
"""
- return self.revoke_cert(cert_id, CertRevokeRequest.REASON_CERTIFICATE_HOLD, comments=comments)
+ return self.revoke_cert(cert_serial_number, 'Certificate_Hold',
+ comments=comments)
@pki.handle_exceptions()
- def unrevoke_cert(self, cert_id):
+ def unrevoke_cert(self, cert_serial_number):
""" Un-revokes a revoked certificate.
Returns a CertRequestInfo object.
- This method requires an agent's authentication cert in the connection object.
+ This method requires an agent's authentication cert in the
+ connection object.
"""
- if cert_id is None:
+ if cert_serial_number is None:
raise ValueError("Certificate ID must be specified")
- url = self.agent_cert_url + '/' + str(cert_id) + '/unrevoke'
+ url = self.agent_cert_url + '/' + str(cert_serial_number) + '/unrevoke'
r = self.connection.post(url, None, headers=self.headers)
return CertRequestInfo.from_json(r.json())
@@ -774,8 +862,9 @@ class CertClient(object):
return CertRequestInfo.from_json(r.json())
@pki.handle_exceptions()
- def list_requests(self, request_status=None, request_type=None, from_request_id=None, size=None,
- max_results=None, max_time=None):
+ def list_requests(self, request_status=None, request_type=None,
+ from_request_id=None, size=None, max_results=None,
+ max_time=None):
"""
Query for a list of certificates using the arguments passed.
Returns a CertRequestInfoCollection object.
@@ -789,7 +878,8 @@ class CertClient(object):
'maxResults': max_results,
'maxTime': max_time
}
- r = self.connection.get(self.agent_cert_requests_url, self.headers, query_params)
+ r = self.connection.get(self.agent_cert_requests_url, self.headers,
+ query_params)
return CertRequestInfoCollection.from_json(r.json())
@pki.handle_exceptions()
@@ -819,15 +909,17 @@ class CertClient(object):
cert_review_response = self.review_request(request_id)
url = self.agent_cert_requests_url + '/' + request_id + '/' + action
- review_response = json.dumps(cert_review_response, cls=encoder.CustomTypeEncoder, sort_keys=True)
+ review_response = json.dumps(cert_review_response,
+ cls=encoder.CustomTypeEncoder,
+ sort_keys=True)
r = self.connection.post(url, review_response, headers=self.headers)
return r
def approve_request(self, request_id, cert_review_response=None):
"""
Approves a certificate enrollment request.
- If cert_review_response is None, a review request operation is performed to fetch the
- CertReviewResponse object.
+ If cert_review_response is None, a review request operation is performed
+ to fetch the CertReviewResponse object.
Requires as agent level authentication.
"""
return self._perform_action(request_id, cert_review_response, 'approve')
@@ -835,17 +927,17 @@ class CertClient(object):
def cancel_request(self, request_id, cert_review_response=None):
"""
Cancels a certificate enrollment request.
- If cert_review_response is None, a review request operation is performed to fetch the
- CertReviewResponse object.
+ If cert_review_response is None, a review request operation is performed
+ to fetch the CertReviewResponse object.
Requires as agent level authentication.
"""
return self._perform_action(request_id, cert_review_response, 'cancel')
- def reject_request(self, request_id, cert_review_response=None):
+ def reject_request(self, request_id, cert_review_response=None):
"""
Rejects a certificate enrollment request.
- If cert_review_response is None, a review request operation is performed to fetch the
- CertReviewResponse object.
+ If cert_review_response is None, a review request operation is performed
+ to fetch the CertReviewResponse object.
Requires as agent level authentication.
"""
return self._perform_action(request_id, cert_review_response, 'reject')
@@ -853,17 +945,18 @@ class CertClient(object):
def validate_request(self, request_id, cert_review_response):
"""
Validates a certificate enrollment request.
- If cert_review_response is None, a review request operation is performed to fetch the
- CertReviewResponse object.
+ If cert_review_response is None, a review request operation is performed
+ to fetch the CertReviewResponse object.
Requires as agent level authentication.
"""
- return self._perform_action(request_id, cert_review_response, 'validate')
+ return self._perform_action(request_id, cert_review_response,
+ 'validate')
def update_request(self, request_id, cert_review_response):
"""
Updates a certificate enrollment request.
- If cert_review_response is None, a review request operation is performed to fetch the
- CertReviewResponse object.
+ If cert_review_response is None, a review request operation is performed
+ to fetch the CertReviewResponse object.
Requires as agent level authentication.
"""
return self._perform_action(request_id, cert_review_response, 'update')
@@ -871,8 +964,8 @@ class CertClient(object):
def assign_request(self, request_id, cert_review_response):
"""
Assigns a certificate enrollment request.
- If cert_review_response is None, a review request operation is performed to fetch the
- CertReviewResponse object.
+ If cert_review_response is None, a review request operation is performed
+ to fetch the CertReviewResponse object.
Requires as agent level authentication.
"""
return self._perform_action(request_id, cert_review_response, 'assign')
@@ -880,17 +973,19 @@ class CertClient(object):
def unassign_request(self, request_id, cert_review_response):
"""
Un-assigns a certificate enrollment request.
- If cert_review_response is None, a review request operation is performed to fetch the
- CertReviewResponse object.
+ If cert_review_response is None, a review request operation is performed
+ to fetch the CertReviewResponse object.
Requires as agent level authentication.
"""
- return self._perform_action(request_id, cert_review_response, 'unassign')
+ return self._perform_action(request_id, cert_review_response,
+ 'unassign')
@pki.handle_exceptions()
def list_enrollment_templates(self, start=None, size=None):
"""
Gets the list of profile templates supported by the CA.
- The values for start and size arguments determine the starting point and the length of the list.
+ The values for start and size arguments determine the starting point and
+ the length of the list.
Returns a ProfileDataInfoCollection object.
"""
@@ -900,7 +995,6 @@ class CertClient(object):
'size': size
}
r = self.connection.get(url, self.headers, query_params)
- print r
return profile.ProfileDataInfoCollection.from_json(r.json())
@pki.handle_exceptions()
@@ -908,10 +1002,13 @@ class CertClient(object):
"""
Fetch the enrollment template for the given profile id.
For the first time, the request is sent to the server.
- The retrieved CertEnrollmentRequest object is then cached locally for future requests.
+ The retrieved CertEnrollmentRequest object is then cached locally for
+ future requests.
Returns a CerEnrollmentRequest object.
"""
+ if profile_id is None:
+ raise ValueError("Profile ID must be specified.")
if profile_id in self.enrollment_templates:
return copy.deepcopy(self.enrollment_templates[profile_id])
url = self.cert_requests_url + '/profiles/' + str(profile_id)
@@ -927,8 +1024,10 @@ class CertClient(object):
def create_enrollment_request(self, profile_id, inputs):
"""
Fetches the enrollment request object for the given profile and
- sets values to its attributes using the values provided in the inputs dictionary.
- Returns the CertEnrollmentRequest object, which can be submitted to enroll a certificate.
+ sets values to its attributes using the values provided in the inputs
+ dictionary.
+ Returns the CertEnrollmentRequest object, which can be submitted to
+ enroll a certificate.
"""
if inputs is None or len(inputs) == 0:
raise ValueError("No inputs provided.")
@@ -945,42 +1044,48 @@ class CertClient(object):
def submit_enrollment_request(self, enrollment_request):
"""
Submits the CertEnrollmentRequest object to the server.
- Returns a CertRequestInfoCollection object with information about the certificate requests
- enrolled at the CA.
+ Returns a CertRequestInfoCollection object with information about the
+ certificate requests enrolled at the CA.
"""
- request_object = json.dumps(enrollment_request, cls=encoder.CustomTypeEncoder, sort_keys=True)
- r = self.connection.post(self.cert_requests_url, request_object, self.headers)
+ request_object = json.dumps(enrollment_request,
+ cls=encoder.CustomTypeEncoder,
+ sort_keys=True)
+ r = self.connection.post(self.cert_requests_url, request_object,
+ self.headers)
return CertRequestInfoCollection.from_json(r.json())
@pki.handle_exceptions()
def enroll_cert(self, profile_id, inputs):
"""
A convenience method for enrolling a certificate for a given profile id.
- The inputs parameter should be a dictionary with values for the profile attributes
- for an enrollment request.
+ The inputs parameter should be a dictionary with values for the profile
+ attributes for an enrollment request.
- Calling this method with valid arguments, creates an enrollment request, submits it
- to the server, approves the certificate requests generated for the enrollment and
- returns a list of CertData objects for all the certificates generated as part of this
- enrollment.
+ Calling this method with valid arguments, creates an enrollment request,
+ submits it to the server, approves the certificate requests generated
+ for the enrollment and returns a list of CertData objects for all the
+ certificates generated as part of this enrollment.
- Note: This method supports only certificate enrollment where only one agent approval
- is sufficient.
+ Note: This method supports only certificate enrollment where only one
+ agent approval is sufficient.
Requires an agent level authentication.
+ Returns a list of CertData objects.
"""
- # Create a CertEnrollmentRequest object using the inputs for the given profile id.
+ # Create a CertEnrollmentRequest object using the inputs for the given
+ # profile id.
enroll_request = self.create_enrollment_request(profile_id, inputs)
# Submit the enrollment request
cert_request_infos = self.submit_enrollment_request(enroll_request)
# Approve the requests generated for the certificate enrollment.
- # Fetch the CertData objects for all the certificates created and return to the caller.
+ # Fetch the CertData objects for all the certificates created and
+ # return to the caller.
certificates = []
- for cert_request_info in cert_request_infos.cert_info_list:
+ for cert_request_info in cert_request_infos.cert_request_info_list:
request_id = cert_request_info.request_id
self.approve_request(request_id)
cert_id = self.get_request(request_id).cert_id
@@ -1010,7 +1115,8 @@ def main():
# Create a PKIConnection object that stores the details of the CA.
connection = client.PKIConnection('https', 'localhost', '8443', 'ca')
- # The pem file used for authentication. Created from a p12 file using the command -
+ # The pem file used for authentication. Created from a p12 file using the
+ # command -
# openssl pkcs12 -in <p12_file_path> -out /tmp/auth.pem -nodes
connection.set_authentication_cert("/tmp/auth.pem")
@@ -1025,13 +1131,19 @@ def main():
inputs = dict()
inputs['cert_request_type'] = 'crmf'
- inputs['cert_request'] = "MIIBpDCCAaAwggEGAgUA5n9VYTCBx4ABAqUOMAwxCjAIBgNVBAMTAXimgZ8wDQYJKoZIhvcNAQEBBQAD" \
- "gY0AMIGJAoGBAK/SmUVoUjBtqHNw/e3OoCSXw42pdQSR53/eYJWpf7nyTbZ9UuIhGfXOtxy5vRetmDHE" \
- "9u0AopmuJbr1rL17/tSnDakpkE9umQ2lMOReLloSdX32w2xOeulUwh5BGbFpq10S0SvW1H93Vn0eCy2a" \
- "a4UtILNEsp7JJ3FnYJibfuMPAgMBAAGpEDAOBgNVHQ8BAf8EBAMCBeAwMzAVBgkrBgEFBQcFAQEMCHJl" \
- "Z1Rva2VuMBoGCSsGAQUFBwUBAgwNYXV0aGVudGljYXRvcqGBkzANBgkqhkiG9w0BAQUFAAOBgQCuywnr" \
- "Dk/wGwfbguw9oVs9gzFQwM4zeFbk+z82G5CWoG/4mVOT5LPL5Q8iF+KfnaU9Qcu6zZPxW6ZmDd8WpPJ+" \
- "MTPyQl3Q5BfiKa4l5ra1NeqxMOlMiiupwINmm7jd1KaA2eIjuyC8/gTaO4b14R6aRaOj+Scp9cNYbthA7REhJw=="
+ inputs['cert_request'] = "MIIBpDCCAaAwggEGAgUA5n9VYTCBx4ABAqUOMAwxCjAIBgN" \
+ "VBAMTAXimgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAK" \
+ "/SmUVoUjBtqHNw/e3OoCSXw42pdQSR53/eYJWpf7nyTbZ9U" \
+ "uIhGfXOtxy5vRetmDHE9u0AopmuJbr1rL17/tSnDakpkE9u" \
+ "mQ2lMOReLloSdX32w2xOeulUwh5BGbFpq10S0SvW1H93Vn0" \
+ "eCy2aa4UtILNEsp7JJ3FnYJibfuMPAgMBAAGpEDAOBgNVHQ" \
+ "8BAf8EBAMCBeAwMzAVBgkrBgEFBQcFAQEMCHJlZ1Rva2VuM" \
+ "BoGCSsGAQUFBwUBAgwNYXV0aGVudGljYXRvcqGBkzANBgkq" \
+ "hkiG9w0BAQUFAAOBgQCuywnrDk/wGwfbguw9oVs9gzFQwM4" \
+ "zeFbk+z82G5CWoG/4mVOT5LPL5Q8iF+KfnaU9Qcu6zZPxW6" \
+ "ZmDd8WpPJ+MTPyQl3Q5BfiKa4l5ra1NeqxMOlMiiupwINmm" \
+ "7jd1KaA2eIjuyC8/gTaO4b14R6aRaOj+Scp9cNYbthA7REh" \
+ "Jw=="
inputs['sn_uid'] = 'test12345'
inputs['sn_e'] = 'example@redhat.com'
inputs['sn_cn'] = 'TestUser'
@@ -1053,13 +1165,18 @@ def main():
inputs = dict()
inputs['cert_request_type'] = 'pkcs10'
- inputs['cert_request'] = "MIIBmDCCAQECAQAwWDELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAk5DMRAwDgYDVQQHDAdSYWxlaWdoMRUwE" \
- "wYDVQQKDAxSZWQgSGF0IEluYy4xEzARBgNVBAMMClRlc3RTZXJ2ZXIwgZ8wDQYJKoZIhvcNAQEBBQADgY" \
- "0AMIGJAoGBAMJpWz92dSYCvWxllrQCY5atPKCswUwyppRNGPnKmJ77AdHBBI4dFyET+h/+69jQMTLZMa8" \
- "FX7SbyHvgbgLBP4Q/RzCSE2S87qFNjriOqiQCqJmcrzDzdncJQiP+O7T6MSpLo3smLP7dK1Vd7vK0Vy8y" \
- "HwV0eBx7DgYedv2slBPHAgMBAAGgADANBgkqhkiG9w0BAQUFAAOBgQBvkxAGKwkfK3TKwLc5Mg0IWp8zG" \
- "RVwxdIlghAL8DugNocCNNgmZazglJOOehLuk0/NkLX1ZM5RrVgM09W6kcfWZtIwr5Uje2K/+6tW2ZTGrb" \
- "izs7CNOTMzA/9H8CkHb4H9P/qRT275zHIocYj4smUnXLwWGsBMeGs+OMMbGvSrHg=="
+ inputs['cert_request'] = "MIIBmDCCAQECAQAwWDELMAkGA1UEBhMCVVMxCzAJBgNVBAg" \
+ "MAk5DMRAwDgYDVQQHDAdSYWxlaWdoMRUwEwYDVQQKDAxSZW" \
+ "QgSGF0IEluYy4xEzARBgNVBAMMClRlc3RTZXJ2ZXIwgZ8wD" \
+ "QYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAMJpWz92dSYCvWxl" \
+ "lrQCY5atPKCswUwyppRNGPnKmJ77AdHBBI4dFyET+h/+69j" \
+ "QMTLZMa8FX7SbyHvgbgLBP4Q/RzCSE2S87qFNjriOqiQCqJ" \
+ "mcrzDzdncJQiP+O7T6MSpLo3smLP7dK1Vd7vK0Vy8yHwV0e" \
+ "Bx7DgYedv2slBPHAgMBAAGgADANBgkqhkiG9w0BAQUFAAOB" \
+ "gQBvkxAGKwkfK3TKwLc5Mg0IWp8zGRVwxdIlghAL8DugNoc" \
+ "CNNgmZazglJOOehLuk0/NkLX1ZM5RrVgM09W6kcfWZtIwr5" \
+ "Uje2K/+6tW2ZTGrbizs7CNOTMzA/9H8CkHb4H9P/qRT275z" \
+ "HIocYj4smUnXLwWGsBMeGs+OMMbGvSrHg=="
inputs['requestor_name'] = 'Tester'
inputs['requestor_email'] = 'example@redhat.com'
@@ -1080,8 +1197,8 @@ def main():
search_params = {'status': 'VALID'}
cert_data_list = cert_client.list_certs(**search_params)
- for cert_data_info in cert_data_list.cert_info_list:
- print("Serial Number: " + cert_data_info.cert_id)
+ for cert_data_info in cert_data_list:
+ print("Serial Number: " + cert_data_info.serial_number)
print("Subject DN: " + cert_data_info.subject_dn)
print("Status: " + cert_data_info.status)
print
@@ -1099,7 +1216,8 @@ def main():
# Certificate Serial Number used for CertClient methods.
# 7, 0x7 and '0x7' are also valid values
- # Following examples use the serial number of the user certificate enrolled before.
+ # Following examples use the serial number of the user certificate enrolled
+ # before.
cert_id = cert_data_infos[0].serial_number
#Get certificate data
@@ -1136,9 +1254,8 @@ def main():
print('Revoking a certificate')
print('----------------------')
- cert_request_info = cert_client.revoke_cert(cert_data.serial_number,
- revocation_reason=CertRevokeRequest.REASON_CERTIFICATE_HOLD,
- comments="Test revoking a cert", nonce=cert_data.nonce)
+ cert_request_info = cert_client.hold_cert(cert_data.serial_number,
+ comments="Test revoking a cert")
print('Request ID: ' + cert_request_info.request_id)
print('Request Type: ' + cert_request_info.request_type)
print('Request Status: ' + cert_request_info.request_status)
diff --git a/base/common/python/pki/profile.py b/base/common/python/pki/profile.py
index 83cd8bcca..34aa32eca 100644
--- a/base/common/python/pki/profile.py
+++ b/base/common/python/pki/profile.py
@@ -14,12 +14,24 @@ import pki.account as account
class ProfileDataInfo(object):
"""Stores information about a profile"""
+
def __init__(self):
self.profile_id = None
self.profile_name = None
self.profile_description = None
self.profile_url = None
+ def __repr__(self):
+ attributes = {
+ "ProfileDataInfo": {
+ 'profile_id': self.profile_id,
+ 'name': self.profile_name,
+ 'description': self.profile_description,
+ 'url': self.profile_url
+ }
+ }
+ return str(attributes)
+
@classmethod
def from_json(cls, attr_list):
profile_data_info = cls()
@@ -41,15 +53,20 @@ class ProfileDataInfoCollection(object):
self.profile_data_list = []
self.links = []
+ def __iter__(self):
+ return iter(self.profile_data_list)
+
@classmethod
def from_json(cls, json_value):
ret = cls()
profile_data_infos = json_value['entries']
if not isinstance(profile_data_infos, types.ListType):
- ret.profile_data_list.append(ProfileDataInfo.from_json(profile_data_infos))
+ ret.profile_data_list.append(
+ ProfileDataInfo.from_json(profile_data_infos))
else:
for profile_info in profile_data_infos:
- ret.profile_data_list.append(ProfileDataInfo.from_json(profile_info))
+ ret.profile_data_list.append(
+ ProfileDataInfo.from_json(profile_info))
links = json_value['Link']
if not isinstance(links, types.ListType):
@@ -64,10 +81,12 @@ class ProfileDataInfoCollection(object):
class Descriptor(object):
"""
This class represents the description of a ProfileAttribute.
- It stores information such as the syntax, constraint and default value of a profile attribute.
+ It stores information such as the syntax, constraint and default value of
+ a profile attribute.
"""
- def __init__(self, syntax=None, constraint=None, description=None, default_value=None):
+ def __init__(self, syntax=None, constraint=None, description=None,
+ default_value=None):
self.syntax = syntax
self.constraint = constraint
self.description = description
@@ -118,6 +137,7 @@ class ProfileAttribute(object):
"""
Represents a profile attribute of a ProfileInput.
"""
+
def __init__(self, name=None, value=None, descriptor=None):
self.name = name
self.value = value
@@ -158,8 +178,8 @@ class ProfileInput(object):
Ex. Subject name, Requestor Information etc.
"""
- def __init__(self, profile_input_id=None, class_id=None, name=None, text=None, attributes=None,
- config_attributes=None):
+ def __init__(self, profile_input_id=None, class_id=None, name=None,
+ text=None, attributes=None, config_attributes=None):
self.profile_input_id = profile_input_id
self.class_id = class_id
@@ -261,17 +281,21 @@ class ProfileInput(object):
attributes = json_value['Attribute']
if not isinstance(attributes, types.ListType):
- profile_input.attributes.append(ProfileAttribute.from_json(attributes))
+ profile_input.attributes.append(
+ ProfileAttribute.from_json(attributes))
else:
for profile_info in attributes:
- profile_input.attributes.append(ProfileAttribute.from_json(profile_info))
+ profile_input.attributes.append(
+ ProfileAttribute.from_json(profile_info))
config_attributes = json_value['ConfigAttribute']
if not isinstance(config_attributes, types.ListType):
- profile_input.config_attributes.append(ProfileAttribute.from_json(config_attributes))
+ profile_input.config_attributes.append(
+ ProfileAttribute.from_json(config_attributes))
else:
for config_attribute in config_attributes:
- profile_input.config_attributes.append(ProfileAttribute.from_json(config_attribute))
+ profile_input.config_attributes.append(
+ ProfileAttribute.from_json(config_attribute))
return profile_input
@@ -282,7 +306,8 @@ class ProfileOutput(object):
using a profile.
"""
- def __init__(self, profile_output_id=None, name=None, text=None, class_id=None, attributes=None):
+ def __init__(self, profile_output_id=None, name=None, text=None,
+ class_id=None, attributes=None):
self.profile_output_id = profile_output_id
self.name = name
self.text = text
@@ -332,15 +357,16 @@ class ProfileOutput(object):
profile_output.class_id = json_value['classId']
attributes = json_value['attributes']
if not isinstance(attributes, types.ListType):
- profile_output.attributes.append(ProfileAttribute.from_json(attributes))
+ profile_output.attributes.append(
+ ProfileAttribute.from_json(attributes))
else:
for profile_info in attributes:
- profile_output.attributes.append(ProfileAttribute.from_json(profile_info))
+ profile_output.attributes.append(
+ ProfileAttribute.from_json(profile_info))
return profile_output
class ProfileParameter(object):
-
def __init__(self, name=None, value=None):
self.name = name
self.value = value
@@ -355,10 +381,12 @@ class ProfileParameter(object):
class PolicyDefault(object):
"""
- An object of this class contains information of the default usage of a specific ProfileInput.
+ An object of this class contains information of the default usage of a
+ specific ProfileInput.
"""
- def __init__(self, name=None, class_id=None, description=None, policy_attributes=None, policy_params=None):
+ def __init__(self, name=None, class_id=None, description=None,
+ policy_attributes=None, policy_params=None):
self.name = name
self.class_id = class_id
self.description = description
@@ -415,24 +443,27 @@ class PolicyDefault(object):
if 'policyAttribute' in json_value:
attributes = json_value['policyAttribute']
if not isinstance(attributes, types.ListType):
- policy_def.policy_attributes.append(ProfileAttribute.from_json(attributes))
+ policy_def.policy_attributes.append(
+ ProfileAttribute.from_json(attributes))
else:
for attr in attributes:
- policy_def.policy_attributes.append(ProfileAttribute.from_json(attr))
+ policy_def.policy_attributes.append(
+ ProfileAttribute.from_json(attr))
if 'params' in json_value:
params = json_value['params']
if not isinstance(params, types.ListType):
- policy_def.policy_params.append(ProfileParameter.from_json(params))
+ policy_def.policy_params.append(
+ ProfileParameter.from_json(params))
else:
for param in params:
- policy_def.policy_params.append(ProfileParameter.from_json(param))
+ policy_def.policy_params.append(
+ ProfileParameter.from_json(param))
return policy_def
class PolicyConstraintValue(object):
-
def __init__(self, name=None, value=None, descriptor=None):
self.name = name
self.value = value
@@ -460,11 +491,12 @@ class PolicyConstraintValue(object):
class PolicyConstraint(object):
"""
- An object of this class contains the policy constraints applied to a ProfileInput
- used by a certificate enrollment request.
+ An object of this class contains the policy constraints applied to a
+ ProfileInput used by a certificate enrollment request.
"""
- def __init__(self, name=None, description=None, class_id=None, policy_constraint_values=None):
+ def __init__(self, name=None, description=None, class_id=None,
+ policy_constraint_values=None):
self.name = name
self.description = description
self.class_id = class_id
@@ -509,10 +541,12 @@ class PolicyConstraint(object):
if 'constraint' in json_value:
constraints = json_value['constraint']
if not isinstance(constraints, types.ListType):
- policy_constraint.policy_constraint_values.append(PolicyConstraintValue.from_json(constraints))
+ policy_constraint.policy_constraint_values.append(
+ PolicyConstraintValue.from_json(constraints))
else:
for constraint in constraints:
- policy_constraint.policy_constraint_values.append(PolicyConstraintValue.from_json(constraint))
+ policy_constraint.policy_constraint_values.append(
+ PolicyConstraintValue.from_json(constraint))
return policy_constraint
@@ -520,11 +554,13 @@ class PolicyConstraint(object):
class ProfilePolicy(object):
"""
This class represents the policy a profile adheres to.
- An object of this class stores the default values for profile and the constraints present on the
- values of the attributes of the profile submitted for an enrollment request.
+ An object of this class stores the default values for profile and the
+ constraints present on the values of the attributes of the profile submitted
+ for an enrollment request.
"""
- def __init__(self, policy_id=None, policy_default=None, policy_constraint=None):
+ def __init__(self, policy_id=None, policy_default=None,
+ policy_constraint=None):
self.policy_id = policy_id
self.policy_default = policy_default
self.policy_constraint = policy_constraint
@@ -563,6 +599,7 @@ class ProfilePolicySet(object):
"""
Stores a list of ProfilePolicy objects.
"""
+
def __init__(self):
self.policies = []
@@ -585,6 +622,7 @@ class PolicySet(object):
An object of this class contains a name value pair of the
policy name and the ProfilePolicy object.
"""
+
def __init__(self, name=None, policy_list=None):
self.name = name
if policy_list is None:
@@ -620,6 +658,8 @@ class PolicySet(object):
for policy in policies:
policy_set.policy_list.append(ProfilePolicy.from_json(policy))
+ return policy_set
+
class PolicySetList(object):
"""
@@ -648,7 +688,8 @@ class PolicySetList(object):
policy_set_list.policy_sets.append(PolicySet.from_json(policy_sets))
else:
for policy_set in policy_sets:
- policy_set_list.policy_sets.append(PolicySet.from_json(policy_set))
+ policy_set_list.policy_sets.append(
+ PolicySet.from_json(policy_set))
class ProfileData(object):
@@ -656,9 +697,11 @@ class ProfileData(object):
This class represents an enrollment profile.
"""
- def __init__(self, profile_id=None, class_id=None, name=None, description=None, enabled=None, visible=None,
- enabled_by=None, authenticator_id=None, authorization_acl=None, renewal=None, xml_output=None,
- inputs=None, outputs=None, policy_sets=None, link=None):
+ def __init__(self, profile_id=None, class_id=None, name=None,
+ description=None, enabled=None, visible=None, enabled_by=None,
+ authenticator_id=None, authorization_acl=None, renewal=None,
+ xml_output=None, inputs=None, outputs=None, policy_sets=None,
+ link=None):
self.profile_id = profile_id
self.name = name
@@ -779,31 +822,49 @@ class ProfileData(object):
profile_data.inputs.append(ProfileInput.from_json(profile_inputs))
else:
for profile_input in profile_inputs:
- profile_data.policy_sets.append(ProfileInput.from_json(profile_input))
+ profile_data.inputs.append(
+ ProfileInput.from_json(profile_input))
profile_outputs = json_value['Output']
if not isinstance(profile_outputs, types.ListType):
- profile_data.outputs.append(ProfileOutput.from_json(profile_outputs))
+ profile_data.outputs.append(
+ ProfileOutput.from_json(profile_outputs))
else:
for profile_output in profile_outputs:
- profile_data.policy_sets.append(ProfileOutput.from_json(profile_output))
+ profile_data.outputs.append(
+ ProfileOutput.from_json(profile_output))
policy_sets = json_value['PolicySets']
if not isinstance(policy_sets, types.ListType):
- profile_data.policy_sets.append(PolicySetList.from_json(policy_sets))
+ profile_data.policy_sets.append(
+ PolicySetList.from_json(policy_sets))
else:
for policy_set in policy_sets:
- profile_data.policy_sets.append(PolicySetList.from_json(policy_set))
+ profile_data.policy_sets.append(
+ PolicySetList.from_json(policy_set))
profile_data.link = pki.Link.from_json(json_value['link'])
return profile_data
+ def __repr__(self):
+ attributes = {
+ "ProfileData": {
+ 'profile_id': self.profile_id,
+ 'name': self.name,
+ 'description': self.description,
+ 'status': ('enabled' if self.enabled else 'disabled'),
+ 'visible': self.visible
+ }
+ }
+ return str(attributes)
+
class ProfileClient(object):
"""
This class consists of methods for accessing the ProfileResource.
"""
+
def __init__(self, connection):
self.connection = connection
self.headers = {'Content-type': 'application/json',
@@ -856,7 +917,8 @@ class ProfileClient(object):
if profile_id is None:
raise ValueError("Profile ID must be specified.")
if action is None:
- raise ValueError("A valid action(enable/disable) must be specified.")
+ raise ValueError("A valid action(enable/disable) must be "
+ "specified.")
url = self.profiles_url + '/' + str(profile_id)
params = {'action': action}
@@ -881,7 +943,8 @@ def main():
# Initialize a PKIConnection object for the CA
connection = client.PKIConnection('https', 'localhost', '8443', 'ca')
- # The pem file used for authentication. Created from a p12 file using the command -
+ # The pem file used for authentication. Created from a p12 file using the
+ # command -
# openssl pkcs12 -in <p12_file_path> -out /tmp/auth.pem -nodes
connection.set_authentication_cert("/tmp/auth.pem")
@@ -892,7 +955,7 @@ def main():
profile_data_infos = profile_client.list_profiles()
print('List of profiles:')
print('-----------------')
- for profile_data_info in profile_data_infos.profile_data_list:
+ for profile_data_info in profile_data_infos:
print(' Profile ID: ' + profile_data_info.profile_id)
print(' Profile Name: ' + profile_data_info.profile_name)
print(' Profile Description: ' + profile_data_info.profile_description)