diff options
author | Christian Heimes <cheimes@redhat.com> | 2015-08-11 20:55:48 +0200 |
---|---|---|
committer | Christian Heimes <cheimes@redhat.com> | 2015-08-14 13:03:18 +0200 |
commit | 12badcabc1cd345256a4902f7b0583cf667ecd8d (patch) | |
tree | d0a45d096fab9c0b14d5221557a616824ecfd24a /base/kra/functional | |
parent | d63ade55f5cc2a9ecf21ea2b43cfac80149c4c29 (diff) | |
download | pki-12badcabc1cd345256a4902f7b0583cf667ecd8d.tar.gz pki-12badcabc1cd345256a4902f7b0583cf667ecd8d.tar.xz pki-12badcabc1cd345256a4902f7b0583cf667ecd8d.zip |
Make pki PEP 8 compatible
Large portions of the patch was automatically created with autopep8:
find base/ -name '*.py' | xargs autopep8 --in-place --ignore E309 \
--aggressive
find base/common/upgrade base/server/upgrade -type f -and \
-not -name .gitignore | autopep8 --in-place --ignore E309 --aggressive
autopep8 --in-place --ignore E309 --aggressive \
base/common/sbin/pki-upgrade \
base/server/sbin/pkispawn \
base/server/sbin/pkidestroy \
base/server/sbin/pki-server \
base/server/sbin/pki-server-upgrade
About two dozent violations were fixed manually.
https://fedorahosted.org/pki/ticket/708
Diffstat (limited to 'base/kra/functional')
-rw-r--r-- | base/kra/functional/drmclient_deprecated.py | 323 | ||||
-rwxr-xr-x | base/kra/functional/drmtest.py | 37 |
2 files changed, 224 insertions, 136 deletions
diff --git a/base/kra/functional/drmclient_deprecated.py b/base/kra/functional/drmclient_deprecated.py index 602057f92..33149bed7 100644 --- a/base/kra/functional/drmclient_deprecated.py +++ b/base/kra/functional/drmclient_deprecated.py @@ -48,9 +48,11 @@ import base64 CERT_HEADER = "-----BEGIN NEW CERTIFICATE REQUEST-----" CERT_FOOTER = "-----END NEW CERTIFICATE REQUEST-----" + def _(string): return string + def parse_key_request_info_xml(doc): ''' :param doc: The root node of the xml document to parse @@ -79,29 +81,30 @@ def parse_key_request_info_xml(doc): request_type = doc.xpath('requestType') if len(request_type) == 1: request_type = etree.tostring(request_type[0], method='text', - encoding=unicode).strip() + encoding=unicode).strip() response['request_type'] = request_type request_status = doc.xpath('requestStatus') if len(request_status) == 1: request_status = etree.tostring(request_status[0], method='text', - encoding=unicode).strip() + encoding=unicode).strip() response['request_status'] = request_status request_url = doc.xpath('requestURL') if len(request_url) == 1: request_url = etree.tostring(request_url[0], method='text', - encoding=unicode).strip() + encoding=unicode).strip() response['request_id'] = request_url.rsplit('/', 1)[1] key_url = doc.xpath('keyURL') if len(key_url) == 1: key_url = etree.tostring(key_url[0], method='text', - encoding=unicode).strip() + encoding=unicode).strip() response['key_id'] = key_url.rsplit('/', 1)[1] return response + def parse_key_request_infos_xml(doc): ''' :param doc: The root node of the xml document to parse @@ -152,6 +155,7 @@ def parse_key_request_infos_xml(doc): return response + def parse_key_data_info_xml(doc): ''' :param doc: The root node of the xml document to parse @@ -176,17 +180,18 @@ def parse_key_data_info_xml(doc): client_id = doc.xpath('clientID') if len(client_id) == 1: client_id = etree.tostring(client_id[0], method='text', - encoding=unicode).strip() + encoding=unicode).strip() response['client_id'] = client_id key_url = doc.xpath('keyURL') if len(key_url) == 1: key_url = etree.tostring(key_url[0], method='text', - encoding=unicode).strip() + encoding=unicode).strip() response['key_url'] = key_url return response + def parse_key_data_infos_xml(doc): ''' :param doc: The root node of the xml document to parse @@ -238,6 +243,7 @@ def parse_key_data_infos_xml(doc): return response + def parse_key_data_xml(doc): ''' :param doc: The root node of the xml document to parse @@ -271,6 +277,7 @@ def parse_key_data_xml(doc): return response + def parse_certificate_data_xml(doc): ''' :param doc: The root node of the xml document to parse @@ -293,11 +300,14 @@ def parse_certificate_data_xml(doc): if len(b64) == 1: b64 = etree.tostring(b64[0], method='text', encoding=unicode).strip() - response['cert'] = b64.replace(CERT_HEADER, "").replace(CERT_FOOTER, "") + b64 = b64.replace(CERT_HEADER, "").replace(CERT_FOOTER, "") + response['cert'] = b64 return response -def https_request(host, port, url, secdir, password, nickname, operation, args, **kw): + +def https_request( + host, port, url, secdir, password, nickname, operation, args, **kw): """ :param url: The URL to post to. :param operation: GET, POST, (PUT and DELETE not yet implemented) @@ -316,9 +326,9 @@ def https_request(host, port, url, secdir, password, nickname, operation, args, request_headers = {"Content-type": "application/xml", "Accept": "application/xml"} if operation == "POST": - if args != None: + if args is not None: post = args - elif kw != None: + elif kw is not None: post = urlencode(kw) request_headers = {"Content-type": "application/x-www-form-urlencoded", "Accept": "text/plain"} @@ -342,7 +352,7 @@ def https_request(host, port, url, secdir, password, nickname, operation, args, http_reason_phrase = unicode(res.reason, 'utf-8') http_headers = res.msg.dict http_body = res.read() - except Exception, e: + except Exception as e: raise NetworkError(uri=uri, error=str(e)) finally: if conn is not None: @@ -350,6 +360,7 @@ def https_request(host, port, url, secdir, password, nickname, operation, args, return http_status, http_reason_phrase, http_headers, http_body + def http_request(host, port, url, operation, args): """ :param url: The URL to post to. @@ -365,16 +376,16 @@ def http_request(host, port, url, operation, args): uri = 'http://%s%s' % (ipautil.format_netloc(host, port), url) logging.info('request %r', uri) request_headers = {"Content-type": "application/xml", - "Accept": "application/xml"} + "Accept": "application/xml"} if operation == "POST": - if args != None: + if args is not None: post = args else: post = "" conn = httplib.HTTPConnection(host, port) try: if operation == "GET": - if args != None: + if args is not None: url = url + "?" + args conn.request("GET", url) elif operation == "POST": @@ -386,7 +397,7 @@ def http_request(host, port, url, operation, args): http_reason_phrase = unicode(res.reason, 'utf-8') http_headers = res.msg.dict http_body = res.read() - except NSPRError, e: + except NSPRError as e: raise NetworkError(uri=uri, error=str(e)) finally: if conn is not None: @@ -399,7 +410,8 @@ def http_request(host, port, url, operation, args): return http_status, http_reason_phrase, http_headers, http_body -class kra: + +class KRA: """ Key Repository Authority backend plugin. """ @@ -411,7 +423,6 @@ class kra: iv = "e4:bb:3b:d3:c3:71:2e:58" fullname = "kra" - def __init__(self, work_dir, kra_host, kra_port, kra_nickname): # crypto self.sec_dir = work_dir @@ -427,10 +438,11 @@ class kra: # set up key db for crypto functions try: nss.nss_init(self.sec_dir) - except Exception, e: - raise CertificateOperationError(error=_('Error in initializing certdb (%s)') \ - + e.strerror) - self.transport_cert = nss.find_cert_from_nickname(self.transport_cert_nickname) + except Exception as e: + raise CertificateOperationError(error=_('Error in initializing certdb (%s)') + + e.strerror) + self.transport_cert = nss.find_cert_from_nickname( + self.transport_cert_nickname) # DRM info self.kra_host = kra_host @@ -441,10 +453,14 @@ class kra: # Get a PK11 slot based on the cipher slot = nss.get_best_slot(mechanism) - if sym_key == None: - sym_key = slot.key_gen(mechanism, None, slot.get_best_key_length(mechanism)) + if sym_key is None: + sym_key = slot.key_gen( + mechanism, + None, + slot.get_best_key_length(mechanism)) - # If initialization vector was supplied use it, otherwise set it to None + # If initialization vector was supplied use it, otherwise set it to + # None if iv: iv_data = nss.read_hex(iv) iv_si = nss.SecItem(iv_data) @@ -497,7 +513,7 @@ class kra: Perform an HTTPS request """ return https_request(self.kra_host, port, url, self.sec_dir, self.password, - self.ipa_certificate_nickname, operation, args, **kw) + self.ipa_certificate_nickname, operation, args, **kw) def symmetric_wrap(self, data, wrapping_key): """ @@ -506,8 +522,10 @@ class kra: Wrap (encrypt) data using the supplied symmetric key """ - encoding_ctx, _decoding_ctx = self.setup_contexts(self.mechanism, wrapping_key, self.iv) - wrapped_data = encoding_ctx.cipher_op(data) + encoding_ctx.digest_final() + encoding_ctx, _decoding_ctx = self.setup_contexts( + self.mechanism, wrapping_key, self.iv) + wrapped_data = encoding_ctx.cipher_op( + data) + encoding_ctx.digest_final() return wrapped_data def asymmetric_wrap(self, data, wrapping_cert): @@ -527,10 +545,12 @@ class kra: Unwrap (decrypt) data using the supplied symmetric key """ - if iv == None: + if iv is None: iv = self.iv - _encoding_ctx, decoding_ctx = self.setup_contexts(self.mechanism, wrapping_key, iv) - unwrapped_data = decoding_ctx.cipher_op(data) + decoding_ctx.digest_final() + _encoding_ctx, decoding_ctx = self.setup_contexts( + self.mechanism, wrapping_key, iv) + unwrapped_data = decoding_ctx.cipher_op( + data) + decoding_ctx.digest_final() return unwrapped_data def get_parse_result_xml(self, xml_text, parse_func): @@ -546,7 +566,9 @@ class kra: parser = etree.XMLParser() doc = etree.fromstring(xml_text, parser) result = parse_func(doc) - self.debug("%s() xml_text:\n%s\nparse_result:\n%s" % (parse_func.__name__, xml_text, result)) + self.debug( + "%s() xml_text:\n%s\nparse_result:\n%s" % + (parse_func.__name__, xml_text, result)) return result def create_archival_request(self, client_id, security_data, data_type): @@ -561,13 +583,16 @@ class kra: root = etree.Element("KeyArchivalRequest") client_id_element = etree.SubElement(root, "clientId") client_id_element.text = client_id - wrapped_private_data_element = etree.SubElement(root, "wrappedPrivateData") + wrapped_private_data_element = etree.SubElement( + root, + "wrappedPrivateData") wrapped_private_data_element.text = security_data data_type_element = etree.SubElement(root, "dataType") data_type_element.text = data_type return etree.ElementTree(root) - def create_recovery_request(self, key_id, request_id, session_key, passphrase, nonce=None): + def create_recovery_request( + self, key_id, request_id, session_key, passphrase, nonce=None): """ :param key_id: identifier of key to be recovered :param request_id: id for the recovery request @@ -575,22 +600,26 @@ class kra: :param passphrase passphrase wrapped in session key :return doc: xml doc with archival request - """ + """ self.debug('%s.create_recovery_request()', self.fullname) root = etree.Element("KeyRecoveryRequest") - if key_id != None: + if key_id is not None: key_id_element = etree.SubElement(root, "keyId") key_id_element.text = key_id - if request_id != None: + if request_id is not None: request_id_element = etree.SubElement(root, "requestId") request_id_element.text = request_id - if session_key != None: - session_key_element = etree.SubElement(root, "transWrappedSessionKey") + if session_key is not None: + session_key_element = etree.SubElement( + root, + "transWrappedSessionKey") session_key_element.text = session_key - if passphrase != None: - passphrase_element = etree.SubElement(root, "sessionWrappedPassphrase") + if passphrase is not None: + passphrase_element = etree.SubElement( + root, + "sessionWrappedPassphrase") passphrase_element.text = passphrase - if nonce != None: + if nonce is not None: nonce_element = etree.SubElement(root, "nonceData") nonce_element.text = nonce return etree.ElementTree(root) @@ -612,24 +641,30 @@ class kra: self.debug('%s.archive_security_data()', self.fullname) # check clientID and security data - if ((client_id == None) or (security_data == None)): - raise CertificateOperationError(error=_('Bad arguments to archive_security_data')) + if ((client_id is None) or (security_data is None)): + raise CertificateOperationError( + error=_('Bad arguments to archive_security_data')) - request = self.create_archival_request(client_id, security_data, data_type) + request = self.create_archival_request( + client_id, + security_data, + data_type) # Call CMS http_status, http_reason_phrase, _http_headers, http_body = \ self._request('/kra/rest/agent/keyrequests/archive', - self.kra_agent_port, - self.POST, - etree.tostring(request.getroot(), encoding='UTF-8')) + self.kra_agent_port, + self.POST, + etree.tostring(request.getroot(), encoding='UTF-8')) # Parse and handle errors if (http_status != 200): - raise CertificateOperationError(error=_('Error in archiving request (%s)') % \ - http_reason_phrase) + raise CertificateOperationError(error=_('Error in archiving request (%s)') % + http_reason_phrase) - parse_result = self.get_parse_result_xml(http_body, parse_key_request_info_xml) + parse_result = self.get_parse_result_xml( + http_body, + parse_key_request_info_xml) return parse_result def get_transport_cert(self, etag=None): @@ -645,17 +680,19 @@ class kra: # Call CMS http_status, http_reason_phrase, http_headers, http_body = \ self._request('/kra/rest/config/cert/transport', - self.kra_agent_port, - self.GET, - None) + self.kra_agent_port, + self.GET, + None) - self.debug("headers: %s" , http_headers) + self.debug("headers: %s", http_headers) # Parse and handle errors if (http_status != 200): - raise CertificateOperationError(error=_('Error in archiving request (%s)') % \ - http_reason_phrase) + raise CertificateOperationError(error=_('Error in archiving request (%s)') % + http_reason_phrase) - parse_result = self.get_parse_result_xml(http_body, parse_certificate_data_xml) + parse_result = self.get_parse_result_xml( + http_body, + parse_certificate_data_xml) return parse_result def list_security_data(self, client_id, key_state=None, next_id=None): @@ -669,14 +706,15 @@ class kra: The command returns a dict as specified in parse_key_data_infos_xml(). """ self.debug('%s.list_security_data()', self.fullname) - if client_id == None: - raise CertificateOperationError(error=_('Bad argument to list_security_data')) + if client_id is None: + raise CertificateOperationError( + error=_('Bad argument to list_security_data')) get_args = "clientID=" + quote_plus(client_id) - if key_state != None: + if key_state is not None: get_args = get_args + "&status=" + quote_plus(key_state) - if next_id != None: + if next_id is not None: # currnently not implemented on server get_args = get_args + "&start=" + quote_plus(next_id) @@ -689,10 +727,12 @@ class kra: # Parse and handle errors if (http_status != 200): - raise CertificateOperationError(error=_('Error in listing keys (%s)') % \ - http_reason_phrase) + raise CertificateOperationError(error=_('Error in listing keys (%s)') % + http_reason_phrase) - parse_result = self.get_parse_result_xml(http_body, parse_key_data_infos_xml) + parse_result = self.get_parse_result_xml( + http_body, + parse_key_data_infos_xml) return parse_result def list_key_requests(self, request_state=None, request_type=None, client_id=None, @@ -709,16 +749,16 @@ class kra: self.debug('%s.list_key_requests()', self.fullname) get_args = "" - if request_state != None: + if request_state is not None: get_args = get_args + "&requestState=" + quote_plus(request_state) - if request_type != None: + if request_type is not None: get_args = get_args + "&requestType=" + quote_plus(request_type) - if client_id != None: + if client_id is not None: get_args = get_args + "&clientID=" + quote_plus(client_id) - if next_id != None: + if next_id is not None: # currnently not implemented on server get_args = get_args + "&start=" + quote_plus(next_id) @@ -731,10 +771,12 @@ class kra: # Parse and handle errors if (http_status != 200): - raise CertificateOperationError(error=_('Error in listing key requests (%s)') % \ - http_reason_phrase) + raise CertificateOperationError(error=_('Error in listing key requests (%s)') % + http_reason_phrase) - parse_result = self.get_parse_result_xml(http_body, parse_key_request_infos_xml) + parse_result = self.get_parse_result_xml( + http_body, + parse_key_request_infos_xml) return parse_result def submit_recovery_request(self, key_id): @@ -750,24 +792,27 @@ class kra: self.debug('%s.submit_recovery_request()', self.fullname) # check clientID and security data - if key_id == None: - raise CertificateOperationError(error=_('Bad argument to archive_security_data')) + if key_id is None: + raise CertificateOperationError( + error=_('Bad argument to archive_security_data')) request = self.create_recovery_request(key_id, None, None, None) # Call CMS http_status, http_reason_phrase, _http_headers, http_body = \ self._request('/kra/rest/agent/keyrequests/recover', - self.kra_agent_port, - self.POST, - etree.tostring(request.getroot(), encoding='UTF-8')) + self.kra_agent_port, + self.POST, + etree.tostring(request.getroot(), encoding='UTF-8')) # Parse and handle errors if (http_status != 200): - raise CertificateOperationError(error=_('Error in archiving request (%s)') % \ - http_reason_phrase) + raise CertificateOperationError(error=_('Error in archiving request (%s)') % + http_reason_phrase) - parse_result = self.get_parse_result_xml(http_body, parse_key_request_info_xml) + parse_result = self.get_parse_result_xml( + http_body, + parse_key_request_info_xml) return parse_result def check_request_status(self, request_id): @@ -800,20 +845,21 @@ class kra: Approve recovery request """ self.debug('%s.approve_recovery_request()', self.fullname) - if request_id == None: - raise CertificateOperationError(error=_('Bad argument to approve_recovery_request')) + if request_id is None: + raise CertificateOperationError( + error=_('Bad argument to approve_recovery_request')) # Call CMS http_status, http_reason_phrase, _http_headers, _http_body = \ self._request('/kra/rest/agent/keyrequests/' + request_id + '/approve', - self.kra_agent_port, - self.POST, - None) + self.kra_agent_port, + self.POST, + None) # Parse and handle errors if (http_status > 399): - raise CertificateOperationError(error=_('Error in approving request (%s)') % \ - http_reason_phrase) + raise CertificateOperationError(error=_('Error in approving request (%s)') % + http_reason_phrase) def reject_recovery_request(self, request_id): """ @@ -822,20 +868,21 @@ class kra: Reject recovery request """ self.debug('%s.reject_recovery_request()', self.fullname) - if request_id == None: - raise CertificateOperationError(error=_('Bad argument to reject_recovery_request')) + if request_id is None: + raise CertificateOperationError( + error=_('Bad argument to reject_recovery_request')) # Call CMS http_status, http_reason_phrase, _http_headers, _http_body = \ self._request('/kra/rest/agent/keyrequests/' + request_id + '/reject', - self.kra_agent_port, - self.POST, - None) + self.kra_agent_port, + self.POST, + None) # Parse and handle errors if (http_status > 399): - raise CertificateOperationError(error=_('Error in rejecting request (%s)') % \ - http_reason_phrase) + raise CertificateOperationError(error=_('Error in rejecting request (%s)') % + http_reason_phrase) def cancel_recovery_request(self, request_id): """ @@ -844,20 +891,21 @@ class kra: Cancel recovery request """ self.debug('%s.cancel_recovery_request()', self.fullname) - if request_id == None: - raise CertificateOperationError(error=_('Bad argument to cancel_recovery_request')) + if request_id is None: + raise CertificateOperationError( + error=_('Bad argument to cancel_recovery_request')) # Call CMS http_status, http_reason_phrase, _http_headers, _http_body = \ self._request('/kra/rest/agent/keyrequests/' + request_id + '/cancel', - self.kra_agent_port, - self.POST, - None) + self.kra_agent_port, + self.POST, + None) # Parse and handle errors if (http_status > 399): - raise CertificateOperationError(error=_('Error in cancelling request (%s)') % \ - http_reason_phrase) + raise CertificateOperationError(error=_('Error in cancelling request (%s)') % + http_reason_phrase) def retrieve_security_data(self, recovery_request_id, passphrase=None): """ @@ -881,23 +929,35 @@ class kra: |data |String | Key data (either wrapped using | | | | passphrase or unwrapped) | +-----------------+---------------+---------------------------------------+ - """ + """ self.debug('%s.retrieve_security_data()', self.fullname) - if recovery_request_id == None: - raise CertificateOperationError(error=_('Bad arguments to retrieve_security_data')) + if recovery_request_id is None: + raise CertificateOperationError( + error=_('Bad arguments to retrieve_security_data')) # generate symmetric key slot = nss.get_best_slot(self.mechanism) - session_key = slot.key_gen(self.mechanism, None, slot.get_best_key_length(self.mechanism)) + session_key = slot.key_gen( + self.mechanism, + None, + slot.get_best_key_length( + self.mechanism)) # wrap this key with the transport cert public_key = self.transport_cert.subject_public_key_info.public_key - wrapped_session_key = base64.b64encode(nss.pub_wrap_sym_key(self.mechanism, public_key, session_key)) + wrapped_session_key = base64.b64encode( + nss.pub_wrap_sym_key( + self.mechanism, + public_key, + session_key)) wrapped_passphrase = None - if passphrase != None: + if passphrase is not None: # wrap passphrase with session key - wrapped_session_key = base64.b64encode(self.symmetric_wrap(passphrase, session_key)) + wrapped_session_key = base64.b64encode( + self.symmetric_wrap( + passphrase, + session_key)) request = self.create_recovery_request(None, recovery_request_id, wrapped_session_key, @@ -906,21 +966,24 @@ class kra: # Call CMS http_status, http_reason_phrase, _http_headers, http_body = \ self._request('/kra/rest/agent/keys/retrieve', - self.kra_agent_port, - self.POST, - etree.tostring(request.getroot(), encoding='UTF-8')) + self.kra_agent_port, + self.POST, + etree.tostring(request.getroot(), encoding='UTF-8')) # Parse and handle errors if (http_status != 200): - raise CertificateOperationError(error=_('Error in retrieving security data (%s)') % \ - http_reason_phrase) + raise CertificateOperationError(error=_('Error in retrieving security data (%s)') % + http_reason_phrase) parse_result = self.get_parse_result_xml(http_body, parse_key_data_xml) - if passphrase == None: - iv = nss.data_to_hex(base64.decodestring(parse_result['nonce_data'])) - parse_result['data'] = self.symmetric_unwrap(base64.decodestring(parse_result['wrapped_data']), - session_key, iv) + if passphrase is None: + iv = nss.data_to_hex( + base64.decodestring( + parse_result['nonce_data'])) + parse_result['data'] = self.symmetric_unwrap( + base64.decodestring(parse_result['wrapped_data']), + session_key, iv) return parse_result @@ -945,13 +1008,26 @@ class kra: import argparse parser = argparse.ArgumentParser(description="Sample Test execution") -parser.add_argument('-d', default='/tmp/drmtest', dest='work_dir', help='Working directory') +parser.add_argument( + '-d', + default='/tmp/drmtest', + dest='work_dir', + help='Working directory') parser.add_argument('--options', default='options.out', dest='options_file', help='File containing test PKIArchiveOptions to be archived') parser.add_argument('--symkey', default='symkey.out', dest='symkey_file', help='File containing test symkey') -parser.add_argument('--host', default='localhost', dest='kra_host', help='DRM hostname') -parser.add_argument('-p', default='10080', type=int, dest='kra_port', help='DRM Port') +parser.add_argument( + '--host', + default='localhost', + dest='kra_host', + help='DRM hostname') +parser.add_argument( + '-p', + default='10080', + type=int, + dest='kra_port', + help='DRM Port') parser.add_argument('-n', default='DRM TransportCert Nickname', dest='kra_nickname', help="DRM Nickname") @@ -963,7 +1039,7 @@ kra_nickname = args.kra_nickname options_file = args.options_file symkey_file = args.symkey_file -test_kra = kra(work_dir, kra_host, kra_port, kra_nickname) +test_kra = KRA(work_dir, kra_host, kra_port, kra_nickname) # list requests requests = test_kra.list_key_requests() @@ -977,7 +1053,10 @@ print transport_cert f = open(work_dir + "/" + options_file) wrapped_key = f.read() client_id = "Python symmetric key " + datetime.now().strftime("%Y-%m-%d %H:%M") -response = test_kra.archive_security_data(client_id, wrapped_key, "symmetricKey") +response = test_kra.archive_security_data( + client_id, + wrapped_key, + "symmetricKey") print response # list keys with client_id @@ -999,7 +1078,7 @@ print "Testing invalid request ID" try: response = test_kra.retrieve_security_data("INVALID") print "Failure: No exception thrown" -except CertificateOperationError, e: +except CertificateOperationError as e: if 'Error in retrieving security data (Bad Request)' == e.error: print "Success: " + e.error else: diff --git a/base/kra/functional/drmtest.py b/base/kra/functional/drmtest.py index 0a7bbeaae..d951d5f25 100755 --- a/base/kra/functional/drmtest.py +++ b/base/kra/functional/drmtest.py @@ -34,7 +34,6 @@ See drmtest.readme.txt. import base64 import getopt -import os import random import shutil import string @@ -84,14 +83,15 @@ def print_key_data(key_data): print "Private Data: " + base64.encodestring(key_data.data) -def run_test(protocol, hostname, port, client_cert, certdb_dir, certdb_password): +def run_test(protocol, hostname, port, client_cert, certdb_dir, + certdb_password): """ test code execution """ # set up the connection to the DRM, including authentication credentials connection = PKIConnection(protocol, hostname, port, 'kra') connection.set_authentication_cert(client_cert) - #create kraclient + # create kraclient crypto = pki.crypto.NSSCryptoProvider(certdb_dir, certdb_password) kraclient = KRAClient(connection, crypto) keyclient = kraclient.keys @@ -207,7 +207,7 @@ def run_test(protocol, hostname, port, client_cert, certdb_dir, certdb_password) print "KeyNotFoundException thrown - Code:" + exc.code + \ " Message: " + exc.message - #Test 13 = getKeyInfo + # Test 13 = getKeyInfo print "Get key info for existing key" key_info = keyclient.get_key_info(key_id) print_key_info(key_info) @@ -217,7 +217,7 @@ def run_test(protocol, hostname, port, client_cert, certdb_dir, certdb_password) key_info = keyclient.get_active_key_info(client_key_id) print_key_info(key_info) - #Test 15: change the key status + # Test 15: change the key status print "Change the key status" keyclient.modify_key_status(key_id, keyclient.KEY_STATUS_INACTIVE) print_key_info(keyclient.get_key_info(key_id)) @@ -239,7 +239,7 @@ def run_test(protocol, hostname, port, client_cert, certdb_dir, certdb_password) print "ResourceNotFoundException thrown - Code: " + exc.code +\ "Message: " + exc.message - #Test 18: Generate a symmetric key with default parameters + # Test 18: Generate a symmetric key with default parameters client_key_id = "Vek #3" + time.strftime('%c') response = keyclient.generate_symmetric_key(client_key_id) print_key_request(response.request_info) @@ -270,7 +270,7 @@ def run_test(protocol, hostname, port, client_cert, certdb_dir, certdb_password) print "Error: archived and recovered keys do not match" print - #Test 20: Generating asymmetric keys + # Test 20: Generating asymmetric keys print "Generating asymmetric keys" try: response = keyclient.generate_asymmetric_key( @@ -284,7 +284,7 @@ def run_test(protocol, hostname, port, client_cert, certdb_dir, certdb_password) print "BadRequestException thrown - Code:" + exc.code +\ " Message: " + exc.message - #Test 21: Get key information of the newly generated asymmetric keys + # Test 21: Get key information of the newly generated asymmetric keys print "Retrieving key information" key_info = keyclient.get_key_info(response.request_info.get_key_id()) print_key_info(key_info) @@ -296,7 +296,7 @@ def usage(): print ' -P <protocol> KRA server protocol (default: https).' print ' -h <hostname> KRA server hostname (default: localhost).' print ' -p <port> KRA server port (default: 8443).' - print ' -n <path> KRA agent certificate and private key (default: kraagent.pem).' + print ' -n <path> KRA agent certificate and private key (default: kraagent.pem).' # nopep8 print print ' --help Show this help message.' @@ -310,9 +310,9 @@ def main(argv): usage() sys.exit(1) - protocol = 'https' - hostname = 'localhost' - port = '8443' + protocol = 'https' + hostname = 'localhost' + port = '8443' client_cert = 'kraagent.pem' for o, a in opts: @@ -340,11 +340,20 @@ def main(argv): certdb_dir = tempfile.mkdtemp(prefix='pki-kra-test-') print "NSS database dir: %s" % certdb_dir - certdb_password = ''.join(random.choice(string.ascii_letters + string.digits) for i in range(8)) + certdb_password = ''.join( + random.choice( + string.ascii_letters + + string.digits) for i in range(8)) print "NSS database password: %s" % certdb_password try: - run_test(protocol, hostname, port, client_cert, certdb_dir, certdb_password) + run_test( + protocol, + hostname, + port, + client_cert, + certdb_dir, + certdb_password) finally: shutil.rmtree(certdb_dir) |