diff options
-rw-r--r-- | base/common/python/pki/cert.py | 376 | ||||
-rw-r--r-- | base/common/python/pki/encoder.py | 13 | ||||
-rw-r--r-- | base/common/python/pki/key.py | 72 | ||||
-rw-r--r-- | base/common/python/pki/kra.py (renamed from base/common/python/pki/kraclient.py) | 0 | ||||
-rw-r--r-- | base/kra/functional/drmtest.py | 73 | ||||
-rw-r--r-- | base/kra/functional/drmtest.readme.txt | 6 |
6 files changed, 223 insertions, 317 deletions
diff --git a/base/common/python/pki/cert.py b/base/common/python/pki/cert.py index 036bbf4e3..f0f429a90 100644 --- a/base/common/python/pki/cert.py +++ b/base/common/python/pki/cert.py @@ -1,8 +1,25 @@ #!/usr/bin/python """ -Created on Feb 13, 2014 + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program; if not, write to the Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + + Copyright (C) 2014 Red Hat, Inc. + All rights reserved. + +Authors: + Abhishek Koneru <akoneru@redhat.com> + Ade Lee <alee@redhat.com> -@author: akoneru """ import copy import json @@ -19,8 +36,17 @@ class CertData(object): Class containing certificate data as returned from getCert() """ + json_attribute_names = { + 'id': 'serial_number', 'IssuerDN': 'issuer_dn', + 'SubjectDN': 'subject_dn', 'PrettyPrint': 'pretty_repr', + 'Encoded': 'encoded', 'NotBefore': 'not_before', + 'NotAfter': 'not_after', 'Status': 'status', 'Nonce': 'nonce', + 'Link': 'link', 'PKCS7CertChain': 'pkcs7_cert_chain' + } + def __init__(self): - """ Constructor """ + """Constructor""" + self.serial_number = None self.issuer_dn = None self.subject_dn = None @@ -47,20 +73,17 @@ class CertData(object): def from_json(cls, attr_list): """ Return CertData object from JSON dict """ cert_data = cls() - cert_data.serial_number = attr_list['id'] - cert_data.issuer_dn = attr_list['IssuerDN'] - cert_data.subject_dn = attr_list['SubjectDN'] - cert_data.pretty_repr = attr_list['PrettyPrint'] - cert_data.encoded = attr_list['Encoded'] - cert_data.pkcs7_cert_chain = attr_list['PKCS7CertChain'] - cert_data.not_before = attr_list['NotBefore'] - cert_data.not_after = attr_list['NotAfter'] - cert_data.status = attr_list['Status'] - cert_data.link = pki.Link.from_json(attr_list['Link']) - - #Special case. Only returned when reviewing a cert. - if 'Nonce' in attr_list: - cert_data.nonce = attr_list['Nonce'] + + for k, v in attr_list.items(): + if k not in ['Link']: + if k in CertData.json_attribute_names: + setattr(cert_data, CertData.json_attribute_names[k], v) + else: + setattr(cert_data, k, v) + + if 'Link' in attr_list: + cert_data.link = pki.Link.from_json(attr_list['Link']) + return cert_data @@ -70,6 +93,14 @@ class CertDataInfo(object): This data is returned when searching/listing certificate records. """ + json_attribute_names = { + 'id': 'serial_number', 'SubjectDN': 'subject_dn', 'Status': 'status', + 'Type': 'type', 'Version': 'version', 'KeyLength': 'key_length', + 'KeyAlgorithmOID': 'key_algorithm_oid', 'Link': 'link', + 'NotValidBefore': 'not_valid_before', + 'NotValidAfter': 'not_valid_after', 'IssuedOn': 'issued_on', + 'IssuedBy': 'issued_by'} + def __init__(self): """ Constructor """ self.serial_number = None @@ -99,18 +130,16 @@ class CertDataInfo(object): def from_json(cls, attr_list): """ Return CertDataInfo object from JSON dict """ cert_data_info = cls() - cert_data_info.serial_number = attr_list['id'] - cert_data_info.subject_dn = attr_list['SubjectDN'] - cert_data_info.status = attr_list['Status'] - cert_data_info.type = attr_list['Type'] - cert_data_info.version = attr_list['Version'] - cert_data_info.key_algorithm_oid = attr_list['KeyAlgorithmOID'] - cert_data_info.key_length = attr_list['KeyLength'] - cert_data_info.not_valid_before = attr_list['NotValidBefore'] - cert_data_info.not_valid_after = attr_list['NotValidAfter'] - cert_data_info.issued_on = attr_list['IssuedOn'] - cert_data_info.issued_by = attr_list['IssuedBy'] - cert_data_info.link = pki.Link.from_json(attr_list['Link']) + for k, v in attr_list.items(): + if k not in ['Link']: + if k in CertDataInfo.json_attribute_names: + setattr(cert_data_info, + CertDataInfo.json_attribute_names[k], v) + else: + setattr(cert_data_info, k, v) + + if 'Link' in attr_list: + cert_data_info.link = pki.Link.from_json(attr_list['Link']) return cert_data_info @@ -157,6 +186,12 @@ class CertRequestInfo(object): An object of this class stores represents a certificate request. """ + json_attribute_names = { + 'requestType': 'request_type', 'requestURL': 'request_url', + 'requestStatus': 'request_status', 'certId': 'cert_id', + 'operationResult': 'operation_result', 'certURL': 'cert_url', + 'errorMessage': 'error_message', 'certRequestType': 'cert_request_type' + } def __init__(self): """ Constructor """ @@ -184,22 +219,18 @@ class CertRequestInfo(object): @classmethod def from_json(cls, attr_list): cert_request_info = cls() - cert_request_info.request_type = attr_list['requestType'] - cert_request_info.request_url = attr_list['requestURL'] - cert_request_info.request_status = attr_list['requestStatus'] - cert_request_info.operation_result = attr_list['operationResult'] + + for k, v in attr_list.items(): + if k not in ['Link']: + if k in CertRequestInfo.json_attribute_names: + setattr(cert_request_info, + CertRequestInfo.json_attribute_names[k], v) + else: + setattr(cert_request_info, k, v) + cert_request_info.request_id = \ str(cert_request_info.request_url)[(str( cert_request_info.request_url).rfind("/") + 1):] - #Optional parameters - if 'certId' in attr_list: - cert_request_info.cert_id = attr_list['certId'] - if 'certURL' in attr_list: - cert_request_info.cert_url = attr_list['certURL'] - if 'certRequestType' in attr_list: - cert_request_info.cert_request_type = attr_list['certRequestType'] - if 'errorMessage' in attr_list: - cert_request_info.error_message = attr_list['errorMessage'] return cert_request_info @@ -376,6 +407,13 @@ class CertEnrollmentRequest(object): enrollment request. """ + json_attribute_names = { + 'ProfileID': 'profile_id', 'Renewal': 'renewal', + 'SerialNumber': 'serial_number', 'RemoteHost': 'remote_host', + 'RemoteAddress': 'remote_address', 'Input': 'inputs', + 'Output': 'outputs' + } + def __init__(self, profile_id=None, renewal=False, serial_number=None, remote_host=None, remote_address=None, inputs=None, outputs=None): @@ -387,64 +425,12 @@ class CertEnrollmentRequest(object): self.remote_address = remote_address if inputs is None: self.inputs = [] + else: + self.inputs = inputs if outputs is None: self.outputs = [] - - @property - def profile_id(self): - return getattr(self, 'ProfileID', None) - - @profile_id.setter - def profile_id(self, value): - setattr(self, 'ProfileID', value) - - @property - def renewal(self): - return getattr(self, 'Renewal', False) - - @renewal.setter - def renewal(self, value): - setattr(self, 'Renewal', value) - - @property - def serial_number(self): - return getattr(self, 'SerialNumber', None) - - @serial_number.setter - def serial_number(self, value): - setattr(self, 'SerialNumber', value) - - @property - def remote_host(self): - return getattr(self, 'RemoteHost', None) - - @remote_host.setter - def remote_host(self, value): - setattr(self, 'RemoteHost', value) - - @property - def remote_address(self): - return getattr(self, 'RemoteAddress', None) - - @remote_address.setter - def remote_address(self, value): - setattr(self, 'RemoteAddress', value) - - @property - def inputs(self): - return getattr(self, 'Input') - - @inputs.setter - def inputs(self, value): - setattr(self, 'Input', value) - - @property - def outputs(self): - return getattr(self, 'Output') - - @outputs.setter - def outputs(self, value): - setattr(self, 'Output', value) + else: + self.outputs = outputs def add_input(self, profile_input): self.inputs.append(profile_input) @@ -479,19 +465,19 @@ class CertEnrollmentRequest(object): return None @classmethod - def from_json(cls, json_value): + def from_json(cls, attr_list): + enroll_request = cls() - enroll_request.profile_id = json_value['ProfileID'] - enroll_request.renewal = json_value['Renewal'] - if 'SerialNumber' in json_value: - enroll_request.serial_number = json_value['SerialNumber'] - if 'RemoteHost' in json_value: - enroll_request.remote_host = json_value['RemoteHost'] - if 'RemoteAddress' in json_value: - enroll_request.remote_address = json_value['RemoteAddress'] + for k, v in attr_list.items(): + if k not in ['Input', 'Output']: + if k in CertEnrollmentRequest.json_attribute_names: + setattr(enroll_request, + CertEnrollmentRequest.json_attribute_names[k], v) + else: + setattr(enroll_request, k, v) - inputs = json_value['Input'] + inputs = attr_list['Input'] if not isinstance(inputs, types.ListType): enroll_request.inputs.append(profile.ProfileInput.from_json(inputs)) else: @@ -499,7 +485,7 @@ class CertEnrollmentRequest(object): enroll_request.inputs.append( profile.ProfileInput.from_json(profile_input)) - outputs = json_value['Output'] + outputs = attr_list['Output'] if not isinstance(outputs, types.ListType): enroll_request.outputs.append( profile.ProfileOutput.from_json(outputs)) @@ -518,6 +504,23 @@ class CertReviewResponse(CertEnrollmentRequest): It contains a nonce required to perform action on the request. """ + json_attribute_names = dict( + CertEnrollmentRequest.json_attribute_names.items() + { + 'requestId': 'request_id', 'requestType': 'request_type', + 'requestStatus': 'request_status', 'requestOwner': 'request_owner', + 'requestCreationTime': 'request_creation_time', + 'requestNotes': 'request_notes', + 'requestModificationTime': 'request_modification_time', + 'profileApprovedBy': 'profile_approved_by', + 'profileSetId': 'profile_set_id', 'profileName': 'profile_name', + 'profileIsVisible': 'profile_is_visible', + 'profileDescription': 'profile_description', + 'profileRemoteHost': 'profile_remote_host', + 'profileRemoteAddr': 'profile_remote_address', + 'ProfilePolicySet': 'policy_sets' + }.items() + ) + def __init__(self, profile_id=None, renewal=False, serial_number=None, remote_host=None, remote_address=None, inputs=None, outputs=None, nonce=None, request_id=None, request_type=None, @@ -554,151 +557,22 @@ class CertReviewResponse(CertEnrollmentRequest): else: self.policy_sets = policy_sets - @property - def request_id(self): - return getattr(self, 'requestId') - - @request_id.setter - def request_id(self, value): - setattr(self, 'requestId', value) - - @property - def request_type(self): - return getattr(self, 'requestType') - - @request_type.setter - def request_type(self, value): - setattr(self, 'requestType', value) - - @property - def request_status(self): - return getattr(self, 'requestStatus') - - @request_status.setter - def request_status(self, value): - setattr(self, 'requestStatus', value) - - @property - def request_owner(self): - return getattr(self, 'requestOwner') - - @request_owner.setter - def request_owner(self, value): - setattr(self, 'requestOwner', value) - - @property - def request_creation_time(self): - return getattr(self, 'requestCreationTime') - - @request_creation_time.setter - def request_creation_time(self, value): - setattr(self, 'requestCreationTime', value) - - @property - def request_modification_time(self): - return getattr(self, 'requestModificationTime') - - @request_modification_time.setter - def request_modification_time(self, value): - setattr(self, 'requestModificationTime', value) - - @property - def request_notes(self): - return getattr(self, 'requestNotes') - - @request_notes.setter - def request_notes(self, value): - setattr(self, 'requestNotes', value) - - @property - def profile_approved_by(self): - return getattr(self, 'profileApprovedBy') - - @profile_approved_by.setter - def profile_approved_by(self, value): - setattr(self, 'profileApprovedBy', value) - - @property - def profile_set_id(self): - return getattr(self, 'profileSetId') - - @profile_set_id.setter - def profile_set_id(self, value): - setattr(self, 'profileSetId', value) - - @property - def profile_is_visible(self): - return getattr(self, 'profileIsVisible') - - @profile_is_visible.setter - def profile_is_visible(self, value): - setattr(self, 'profileIsVisible', value) - - @property - def profile_name(self): - return getattr(self, 'profileName') - - @profile_name.setter - def profile_name(self, value): - setattr(self, 'profileName', value) - - @property - def profile_description(self): - return getattr(self, 'profileDescription') - - @profile_description.setter - def profile_description(self, value): - setattr(self, 'profileDescription', value) - - @property - def profile_remote_host(self): - return getattr(self, 'profileRemoteHost') - - @profile_remote_host.setter - def profile_remote_host(self, value): - setattr(self, 'profileRemoteHost', value) - - @property - def profile_remote_address(self): - return getattr(self, 'profileRemoteAddr') - - @profile_remote_address.setter - def profile_remote_address(self, value): - setattr(self, 'profileRemoteAddr', value) - - @property - def policy_sets(self): - return getattr(self, 'ProfilePolicySet') - - @policy_sets.setter - def policy_sets(self, value): - setattr(self, 'ProfilePolicySet', value) - @classmethod - def from_json(cls, json_value): + def from_json(cls, attr_list): #First read the values for attributes defined in CertEnrollmentRequest - review_response = super(CertReviewResponse, cls).from_json(json_value) - - review_response.nonce = json_value['nonce'] - review_response.request_id = json_value['requestId'] - review_response.request_type = json_value['requestType'] - review_response.request_status = json_value['requestStatus'] - review_response.request_owner = json_value['requestOwner'] - review_response.request_creation_time = \ - json_value['requestCreationTime'] - review_response.request_modification_time = \ - json_value['requestModificationTime'] - review_response.request_notes = json_value['requestNotes'] - review_response.profile_approved_by = json_value['profileApprovedBy'] - review_response.profile_set_id = json_value['profileSetId'] - review_response.profile_is_visible = json_value['profileIsVisible'] - review_response.profile_name = json_value['profileName'] - review_response.profile_description = json_value['profileDescription'] - review_response.profile_remote_host = json_value['profileRemoteHost'] - review_response.profile_remote_address = json_value['profileRemoteAddr'] - - profile_policy_sets = json_value['ProfilePolicySet'] + review_response = super(CertReviewResponse, cls).from_json(attr_list) + + for k, v in attr_list.items(): + if k not in ['ProfilePolicySet'] and k not in \ + CertEnrollmentRequest.json_attribute_names: + if k in CertReviewResponse.json_attribute_names: + setattr(review_response, + CertReviewResponse.json_attribute_names[k], v) + else: + setattr(review_response, k, v) + + profile_policy_sets = attr_list['ProfilePolicySet'] if not isinstance(profile_policy_sets, types.ListType): review_response.policy_sets.append( profile.ProfilePolicySet.from_json(profile_policy_sets)) diff --git a/base/common/python/pki/encoder.py b/base/common/python/pki/encoder.py index 0ed194d0d..06a23250e 100644 --- a/base/common/python/pki/encoder.py +++ b/base/common/python/pki/encoder.py @@ -36,9 +36,20 @@ class CustomTypeEncoder(json.JSONEncoder): return {k: obj.__dict__} for k, v in NOTYPES.items(): if isinstance(obj, v): - return obj.__dict__ + return self.attr_name_conversion(obj.__dict__, v) return json.JSONEncoder.default(self, obj) + @staticmethod + def attr_name_conversion(attr_dict, object_class): + if not hasattr(object_class, 'json_attribute_names'): + return attr_dict + for k, v in object_class.json_attribute_names.items(): + if v in attr_dict: + value = attr_dict[v] + del attr_dict[v] + attr_dict[k] = value + return attr_dict + def CustomTypeDecoder(dct): if len(dct) == 1: diff --git a/base/common/python/pki/key.py b/base/common/python/pki/key.py index 5a24c2a31..048cc0d41 100644 --- a/base/common/python/pki/key.py +++ b/base/common/python/pki/key.py @@ -1,7 +1,4 @@ #!/usr/bin/python -# Authors: -# Abhishek Koneru <akoneru@redhat.com> -# Ade Lee <alee@redhat.com> # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -19,6 +16,10 @@ # Copyright (C) 2013 Red Hat, Inc. # All rights reserved. # +# Authors: +# Abhishek Koneru <akoneru@redhat.com> +# Ade Lee <alee@redhat.com> +# """ Module containing the Python client classes for the KeyClient and KeyRequestClient REST API on a DRM @@ -32,17 +33,6 @@ import pki import pki.encoder as encoder -#pylint: disable-msg=R0903 -class KeyId(object): - """ - Class representing a key ID - """ - - def __init__(self, key_id=None): - """ Constructor """ - self.value = key_id - - #should be moved to request.py #pylint: disable-msg=R0903 class RequestId(object): @@ -63,6 +53,10 @@ class KeyData(object): to send information of the key in the key retrieval requests. """ + json_attribute_names = { + 'nonceData': 'nonce_data', 'wrappedPrivateData': 'wrapped_private_data' + } + # pylint: disable-msg=C0103 def __init__(self): """ Constructor """ @@ -75,10 +69,11 @@ class KeyData(object): def from_json(cls, attr_list): """ Return a KeyData object from a JSON dict """ key_data = cls() - key_data.algorithm = attr_list['algorithm'] - key_data.nonce_data = attr_list['nonceData'] - key_data.size = attr_list['size'] - key_data.wrapped_private_data = attr_list['wrappedPrivateData'] + for k, v in attr_list.items(): + if k in KeyData.json_attribute_names: + setattr(key_data, KeyData.json_attribute_names[k], v) + else: + setattr(key_data, k, v) return key_data @@ -108,6 +103,11 @@ class KeyInfo(object): contain the secret itself. """ + json_attribute_names = { + 'clientKeyID': 'client_key_id', 'keyURL': 'key_url', + 'ownerName': 'owner_name' + } + # pylint: disable-msg=C0103 def __init__(self): """ Constructor """ @@ -122,12 +122,11 @@ class KeyInfo(object): def from_json(cls, attr_list): """ Return KeyInfo from JSON dict """ key_info = cls() - key_info.client_key_id = attr_list['clientKeyID'] - key_info.key_url = attr_list['keyURL'] - key_info.algorithm = attr_list['algorithm'] - key_info.status = attr_list['status'] - key_info.owner_name = attr_list['ownerName'] - key_info.size = attr_list['size'] + for k, v in attr_list.items(): + if k in KeyInfo.json_attribute_names: + setattr(key_info, KeyInfo.json_attribute_names[k], v) + else: + setattr(key_info, k, v) return key_info def get_key_id(self): @@ -169,6 +168,11 @@ class KeyRequestInfo(object): key generation etc.) in the DRM. """ + json_attribute_names = { + 'requestURL': 'request_url', 'requestType': 'request_type', + 'keyURL': 'key_url', 'requestStatus': 'request_status' + } + # pylint: disable-msg=C0103 def __init__(self): """ Constructor """ @@ -181,27 +185,27 @@ class KeyRequestInfo(object): def from_json(cls, attr_list): """ Return a KeyRequestInfo object from a JSON dict. """ key_request_info = cls() - key_request_info.request_url = attr_list['requestURL'] - key_request_info.request_type = attr_list['requestType'] - - if 'keyURL' in attr_list: - key_request_info.key_url = attr_list['keyURL'] + for k, v in attr_list.items(): + if k in KeyRequestInfo.json_attribute_names: + setattr(key_request_info, + KeyRequestInfo.json_attribute_names[k], v) + else: + setattr(key_request_info, k, v) - key_request_info.request_status = attr_list['requestStatus'] return key_request_info def get_request_id(self): """ Return the request ID by parsing the request URL. """ if self.request_url is not None: - indx = str(self.request_url).rfind("/") + 1 - return str(self.request_url)[indx:] + index = str(self.request_url).rfind("/") + 1 + return str(self.request_url)[index:] return None def get_key_id(self): """ Return the ID of the secret referred to by this request. """ if self.key_url is not None: - indx = str(self.key_url).rfind("/") + 1 - return str(self.key_url)[indx:] + index = str(self.key_url).rfind("/") + 1 + return str(self.key_url)[index:] return None diff --git a/base/common/python/pki/kraclient.py b/base/common/python/pki/kra.py index e3daabdd3..e3daabdd3 100644 --- a/base/common/python/pki/kraclient.py +++ b/base/common/python/pki/kra.py diff --git a/base/kra/functional/drmtest.py b/base/kra/functional/drmtest.py index 0fff95c2e..9ef096194 100644 --- a/base/kra/functional/drmtest.py +++ b/base/kra/functional/drmtest.py @@ -1,21 +1,23 @@ -# Authors: -# Ade Lee <alee@redhat.com> -# -# Copyright (C) 2012 Red Hat -# see file 'COPYING' for use and warranty information +#!/usr/bin/python # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. +# the Free Software Foundation; version 2 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# +# Copyright (C) 2013 Red Hat, Inc. +# All rights reserved. +# +# Authors: +# Ade Lee <alee@redhat.com> """ ========================================================================= @@ -36,7 +38,7 @@ import pki.key as key import time from pki.client import PKIConnection -from pki.kraclient import KRAClient +from pki.kra import KRAClient def print_key_request(request): @@ -44,7 +46,7 @@ def print_key_request(request): print "RequestURL: " + str(request.request_url) print "RequestType: " + str(request.request_type) print "RequestStatus: " + str(request.request_status) - print "KeyURL: " + str(request.keyURL) + print "KeyURL: " + str(request.key_url) def print_key_info(key_info): @@ -62,7 +64,8 @@ def print_key_data(key_data): print "Key Algorithm: " + str(key_data.algorithm) print "Key Size: " + str(key_data.size) print "Nonce Data: " + base64.encodestring(key_data.nonce_data) - print "Wrapped Private Data: " + base64.encodestring(key_data.encrypted_data) + print "Wrapped Private Data: " + \ + base64.encodestring(key_data.encrypted_data) if key_data.data is not None: print "Private Data: " + base64.encodestring(key_data.data) @@ -72,12 +75,13 @@ def main(): # set up the connection to the DRM, including authentication credentials connection = PKIConnection('https', 'localhost', '8443', 'kra') - connection.set_authentication_cert('/tmp/temp4.pem') + connection.set_authentication_cert('/tmp/auth.pem') # create an NSS DB for crypto operations certdb_dir = "/tmp/drmtest-certdb" certdb_password = "redhat123" - cryptoutil.NSSCryptoUtil.setup_database(certdb_dir, certdb_password, over_write=True) + cryptoutil.NSSCryptoUtil.setup_database(certdb_dir, certdb_password, + over_write=True) #create kraclient crypto = cryptoutil.NSSCryptoUtil(certdb_dir, certdb_password) @@ -87,7 +91,9 @@ def main(): # Get transport cert and insert in the certdb transport_nick = "kra transport cert" transport_cert = kraclient.system_certs.get_transport_cert() - tcert = transport_cert[len(pki.CERT_HEADER):len(transport_cert) - len(pki.CERT_FOOTER)] + print transport_cert + tcert = transport_cert[len(pki.CERT_HEADER):len(transport_cert) - len( + pki.CERT_FOOTER)] crypto.import_cert(transport_nick, base64.decodestring(tcert), "u,u,u") # initialize the certdb for crypto operations @@ -117,18 +123,20 @@ def main(): client_key_id = "Vek #1" + time.strftime('%c') algorithm = "AES" key_size = 128 - usages = [key.SymKeyGenerationRequest.DECRYPT_USAGE, key.SymKeyGenerationRequest.ENCRYPT_USAGE] + usages = [key.SymKeyGenerationRequest.DECRYPT_USAGE, + key.SymKeyGenerationRequest.ENCRYPT_USAGE] response = keyclient.generate_symmetric_key(client_key_id, algorithm=algorithm, size=key_size, usages=usages) - print_key_request(response.requestInfo) - print "Request ID is " + response.requestInfo.get_request_id() + print_key_request(response.request_info) + print "Request ID is " + response.request_info.get_request_id() key_id = response.get_key_id() # Test 5: Confirm the key_id matches print "Now getting key ID for clientKeyID=\"" + client_key_id + "\"" - key_infos = keyclient.list_keys(client_key_id=client_key_id, status=keyclient.KEY_STATUS_ACTIVE) + key_infos = keyclient.list_keys(client_key_id=client_key_id, + status=keyclient.KEY_STATUS_ACTIVE) key_id2 = None for key_info in key_infos.key_infos: print_key_info(key_info) @@ -138,11 +146,14 @@ def main(): else: print "Failure - key_ids for generation do not match!" - # Test 6: Barbican_decode() - Retrieve while providing trans_wrapped_session_key + # Test 6: Barbican_decode() - Retrieve while providing + # trans_wrapped_session_key session_key = crypto.generate_session_key() - wrapped_session_key = crypto.asymmetric_wrap(session_key, keyclient.transport_cert) + wrapped_session_key = crypto.asymmetric_wrap(session_key, + keyclient.transport_cert) print "My key id is " + str(key_id) - key_data = keyclient.retrieve_key(key_id, trans_wrapped_session_key=wrapped_session_key) + key_data = keyclient.retrieve_key( + key_id, trans_wrapped_session_key=wrapped_session_key) print_key_data(key_data) unwrapped_key = crypto.symmetric_unwrap(key_data.encrypted_data, session_key, @@ -170,21 +181,24 @@ def main(): size=key_size, usages=usages) except pki.BadRequestException as exc: - print "BadRequestException thrown - Code:" + exc.code + " Message: " + exc.message + print "BadRequestException thrown - Code:" + exc.code +\ + " Message: " + exc.message # Test 11 - Test RequestNotFoundException on get_request_info print "Try to list a nonexistent request" try: keyclient.get_request_info('200000034') except pki.RequestNotFoundException as exc: - print "RequestNotFoundException thrown - Code:" + exc.code + " Message: " + exc.message + print "RequestNotFoundException thrown - Code:" + exc.code +\ + " Message: " + exc.message # Test 12 - Test exception on retrieve_key. print "Try to retrieve an invalid key" try: keyclient.retrieve_key('2000003434') except pki.KeyNotFoundException as exc: - print "KeyNotFoundException thrown - Code:" + exc.code + " Message: " + exc.message + print "KeyNotFoundException thrown - Code:" + exc.code + \ + " Message: " + exc.message #Test 13 = getKeyInfo print "Get key info for existing key" @@ -206,7 +220,8 @@ def main(): try: keyclient.get_key_info('200004556') except pki.KeyNotFoundException as exc: - print "KeyNotFoundException thrown - Code:" + exc.code + " Message: " + exc.message + print "KeyNotFoundException thrown - Code:" + exc.code +\ + " Message: " + exc.message # Test 17: Get key info for non-existent active key print "Get non-existent active key" @@ -214,7 +229,8 @@ def main(): key_info = keyclient.get_active_key_info(client_key_id) print_key_info(key_info) except pki.ResourceNotFoundException as exc: - print "ResourceNotFoundException thrown - Code: " + exc.code + "Message: " + exc.message + print "ResourceNotFoundException thrown - Code: " + exc.code +\ + "Message: " + exc.message #Test 18: Generate a symmetric key with default parameters client_key_id = "Vek #3" + time.strftime('%c') @@ -226,7 +242,8 @@ def main(): print "key to archive: " + key1 client_key_id = "Vek #4" + time.strftime('%c') - response = keyclient.archive_key(client_key_id, keyclient.SYMMETRIC_KEY_TYPE, + response = keyclient.archive_key(client_key_id, + keyclient.SYMMETRIC_KEY_TYPE, base64.decodestring(key1), key_algorithm=keyclient.AES_ALGORITHM, key_size=128) diff --git a/base/kra/functional/drmtest.readme.txt b/base/kra/functional/drmtest.readme.txt index 4e5c5f308..46debaa2b 100644 --- a/base/kra/functional/drmtest.readme.txt +++ b/base/kra/functional/drmtest.readme.txt @@ -1,6 +1,6 @@ You will need to set up a few things first though: -1. Install a CA/KRA. It this is not on the default ports, you will -need to modify the connection information in KRAClient.__main__ +1. Install a CA/KRA. If this is not on the default ports, you will +need to modify the connection information in drmtest.__main__ 2. The python code uses python-requests to talk to the server, and requests uses openssl. That means you need to export your DRM admin @@ -25,4 +25,4 @@ chmod +r /tmp/drmtest/certdb/* certutil -L -d /var/lib/pki/pki-tomcat/alias/ -n "transportCert cert-pki-tomcat KRA" -a > transport_cert.txt certutil -A -d /tmp/drmtest/certdb/ -n "kra transport cert" -i ./transport_cert.txt -a -t "u,u,u" -4. Then just run kraclient.__main__ with no arguments.
\ No newline at end of file +4. Then just run drmtest.__main__ with no arguments.
\ No newline at end of file |