summaryrefslogtreecommitdiffstats
path: root/base/kra/functional
diff options
context:
space:
mode:
authorChristian Heimes <cheimes@redhat.com>2015-08-11 20:55:48 +0200
committerChristian Heimes <cheimes@redhat.com>2015-08-14 13:03:18 +0200
commit12badcabc1cd345256a4902f7b0583cf667ecd8d (patch)
treed0a45d096fab9c0b14d5221557a616824ecfd24a /base/kra/functional
parentd63ade55f5cc2a9ecf21ea2b43cfac80149c4c29 (diff)
downloadpki-12badcabc1cd345256a4902f7b0583cf667ecd8d.tar.gz
pki-12badcabc1cd345256a4902f7b0583cf667ecd8d.tar.xz
pki-12badcabc1cd345256a4902f7b0583cf667ecd8d.zip
Make pki PEP 8 compatible
Large portions of the patch was automatically created with autopep8: find base/ -name '*.py' | xargs autopep8 --in-place --ignore E309 \ --aggressive find base/common/upgrade base/server/upgrade -type f -and \ -not -name .gitignore | autopep8 --in-place --ignore E309 --aggressive autopep8 --in-place --ignore E309 --aggressive \ base/common/sbin/pki-upgrade \ base/server/sbin/pkispawn \ base/server/sbin/pkidestroy \ base/server/sbin/pki-server \ base/server/sbin/pki-server-upgrade About two dozent violations were fixed manually. https://fedorahosted.org/pki/ticket/708
Diffstat (limited to 'base/kra/functional')
-rw-r--r--base/kra/functional/drmclient_deprecated.py323
-rwxr-xr-xbase/kra/functional/drmtest.py37
2 files changed, 224 insertions, 136 deletions
diff --git a/base/kra/functional/drmclient_deprecated.py b/base/kra/functional/drmclient_deprecated.py
index 602057f92..33149bed7 100644
--- a/base/kra/functional/drmclient_deprecated.py
+++ b/base/kra/functional/drmclient_deprecated.py
@@ -48,9 +48,11 @@ import base64
CERT_HEADER = "-----BEGIN NEW CERTIFICATE REQUEST-----"
CERT_FOOTER = "-----END NEW CERTIFICATE REQUEST-----"
+
def _(string):
return string
+
def parse_key_request_info_xml(doc):
'''
:param doc: The root node of the xml document to parse
@@ -79,29 +81,30 @@ def parse_key_request_info_xml(doc):
request_type = doc.xpath('requestType')
if len(request_type) == 1:
request_type = etree.tostring(request_type[0], method='text',
- encoding=unicode).strip()
+ encoding=unicode).strip()
response['request_type'] = request_type
request_status = doc.xpath('requestStatus')
if len(request_status) == 1:
request_status = etree.tostring(request_status[0], method='text',
- encoding=unicode).strip()
+ encoding=unicode).strip()
response['request_status'] = request_status
request_url = doc.xpath('requestURL')
if len(request_url) == 1:
request_url = etree.tostring(request_url[0], method='text',
- encoding=unicode).strip()
+ encoding=unicode).strip()
response['request_id'] = request_url.rsplit('/', 1)[1]
key_url = doc.xpath('keyURL')
if len(key_url) == 1:
key_url = etree.tostring(key_url[0], method='text',
- encoding=unicode).strip()
+ encoding=unicode).strip()
response['key_id'] = key_url.rsplit('/', 1)[1]
return response
+
def parse_key_request_infos_xml(doc):
'''
:param doc: The root node of the xml document to parse
@@ -152,6 +155,7 @@ def parse_key_request_infos_xml(doc):
return response
+
def parse_key_data_info_xml(doc):
'''
:param doc: The root node of the xml document to parse
@@ -176,17 +180,18 @@ def parse_key_data_info_xml(doc):
client_id = doc.xpath('clientID')
if len(client_id) == 1:
client_id = etree.tostring(client_id[0], method='text',
- encoding=unicode).strip()
+ encoding=unicode).strip()
response['client_id'] = client_id
key_url = doc.xpath('keyURL')
if len(key_url) == 1:
key_url = etree.tostring(key_url[0], method='text',
- encoding=unicode).strip()
+ encoding=unicode).strip()
response['key_url'] = key_url
return response
+
def parse_key_data_infos_xml(doc):
'''
:param doc: The root node of the xml document to parse
@@ -238,6 +243,7 @@ def parse_key_data_infos_xml(doc):
return response
+
def parse_key_data_xml(doc):
'''
:param doc: The root node of the xml document to parse
@@ -271,6 +277,7 @@ def parse_key_data_xml(doc):
return response
+
def parse_certificate_data_xml(doc):
'''
:param doc: The root node of the xml document to parse
@@ -293,11 +300,14 @@ def parse_certificate_data_xml(doc):
if len(b64) == 1:
b64 = etree.tostring(b64[0], method='text',
encoding=unicode).strip()
- response['cert'] = b64.replace(CERT_HEADER, "").replace(CERT_FOOTER, "")
+ b64 = b64.replace(CERT_HEADER, "").replace(CERT_FOOTER, "")
+ response['cert'] = b64
return response
-def https_request(host, port, url, secdir, password, nickname, operation, args, **kw):
+
+def https_request(
+ host, port, url, secdir, password, nickname, operation, args, **kw):
"""
:param url: The URL to post to.
:param operation: GET, POST, (PUT and DELETE not yet implemented)
@@ -316,9 +326,9 @@ def https_request(host, port, url, secdir, password, nickname, operation, args,
request_headers = {"Content-type": "application/xml",
"Accept": "application/xml"}
if operation == "POST":
- if args != None:
+ if args is not None:
post = args
- elif kw != None:
+ elif kw is not None:
post = urlencode(kw)
request_headers = {"Content-type": "application/x-www-form-urlencoded",
"Accept": "text/plain"}
@@ -342,7 +352,7 @@ def https_request(host, port, url, secdir, password, nickname, operation, args,
http_reason_phrase = unicode(res.reason, 'utf-8')
http_headers = res.msg.dict
http_body = res.read()
- except Exception, e:
+ except Exception as e:
raise NetworkError(uri=uri, error=str(e))
finally:
if conn is not None:
@@ -350,6 +360,7 @@ def https_request(host, port, url, secdir, password, nickname, operation, args,
return http_status, http_reason_phrase, http_headers, http_body
+
def http_request(host, port, url, operation, args):
"""
:param url: The URL to post to.
@@ -365,16 +376,16 @@ def http_request(host, port, url, operation, args):
uri = 'http://%s%s' % (ipautil.format_netloc(host, port), url)
logging.info('request %r', uri)
request_headers = {"Content-type": "application/xml",
- "Accept": "application/xml"}
+ "Accept": "application/xml"}
if operation == "POST":
- if args != None:
+ if args is not None:
post = args
else:
post = ""
conn = httplib.HTTPConnection(host, port)
try:
if operation == "GET":
- if args != None:
+ if args is not None:
url = url + "?" + args
conn.request("GET", url)
elif operation == "POST":
@@ -386,7 +397,7 @@ def http_request(host, port, url, operation, args):
http_reason_phrase = unicode(res.reason, 'utf-8')
http_headers = res.msg.dict
http_body = res.read()
- except NSPRError, e:
+ except NSPRError as e:
raise NetworkError(uri=uri, error=str(e))
finally:
if conn is not None:
@@ -399,7 +410,8 @@ def http_request(host, port, url, operation, args):
return http_status, http_reason_phrase, http_headers, http_body
-class kra:
+
+class KRA:
"""
Key Repository Authority backend plugin.
"""
@@ -411,7 +423,6 @@ class kra:
iv = "e4:bb:3b:d3:c3:71:2e:58"
fullname = "kra"
-
def __init__(self, work_dir, kra_host, kra_port, kra_nickname):
# crypto
self.sec_dir = work_dir
@@ -427,10 +438,11 @@ class kra:
# set up key db for crypto functions
try:
nss.nss_init(self.sec_dir)
- except Exception, e:
- raise CertificateOperationError(error=_('Error in initializing certdb (%s)') \
- + e.strerror)
- self.transport_cert = nss.find_cert_from_nickname(self.transport_cert_nickname)
+ except Exception as e:
+ raise CertificateOperationError(error=_('Error in initializing certdb (%s)')
+ + e.strerror)
+ self.transport_cert = nss.find_cert_from_nickname(
+ self.transport_cert_nickname)
# DRM info
self.kra_host = kra_host
@@ -441,10 +453,14 @@ class kra:
# Get a PK11 slot based on the cipher
slot = nss.get_best_slot(mechanism)
- if sym_key == None:
- sym_key = slot.key_gen(mechanism, None, slot.get_best_key_length(mechanism))
+ if sym_key is None:
+ sym_key = slot.key_gen(
+ mechanism,
+ None,
+ slot.get_best_key_length(mechanism))
- # If initialization vector was supplied use it, otherwise set it to None
+ # If initialization vector was supplied use it, otherwise set it to
+ # None
if iv:
iv_data = nss.read_hex(iv)
iv_si = nss.SecItem(iv_data)
@@ -497,7 +513,7 @@ class kra:
Perform an HTTPS request
"""
return https_request(self.kra_host, port, url, self.sec_dir, self.password,
- self.ipa_certificate_nickname, operation, args, **kw)
+ self.ipa_certificate_nickname, operation, args, **kw)
def symmetric_wrap(self, data, wrapping_key):
"""
@@ -506,8 +522,10 @@ class kra:
Wrap (encrypt) data using the supplied symmetric key
"""
- encoding_ctx, _decoding_ctx = self.setup_contexts(self.mechanism, wrapping_key, self.iv)
- wrapped_data = encoding_ctx.cipher_op(data) + encoding_ctx.digest_final()
+ encoding_ctx, _decoding_ctx = self.setup_contexts(
+ self.mechanism, wrapping_key, self.iv)
+ wrapped_data = encoding_ctx.cipher_op(
+ data) + encoding_ctx.digest_final()
return wrapped_data
def asymmetric_wrap(self, data, wrapping_cert):
@@ -527,10 +545,12 @@ class kra:
Unwrap (decrypt) data using the supplied symmetric key
"""
- if iv == None:
+ if iv is None:
iv = self.iv
- _encoding_ctx, decoding_ctx = self.setup_contexts(self.mechanism, wrapping_key, iv)
- unwrapped_data = decoding_ctx.cipher_op(data) + decoding_ctx.digest_final()
+ _encoding_ctx, decoding_ctx = self.setup_contexts(
+ self.mechanism, wrapping_key, iv)
+ unwrapped_data = decoding_ctx.cipher_op(
+ data) + decoding_ctx.digest_final()
return unwrapped_data
def get_parse_result_xml(self, xml_text, parse_func):
@@ -546,7 +566,9 @@ class kra:
parser = etree.XMLParser()
doc = etree.fromstring(xml_text, parser)
result = parse_func(doc)
- self.debug("%s() xml_text:\n%s\nparse_result:\n%s" % (parse_func.__name__, xml_text, result))
+ self.debug(
+ "%s() xml_text:\n%s\nparse_result:\n%s" %
+ (parse_func.__name__, xml_text, result))
return result
def create_archival_request(self, client_id, security_data, data_type):
@@ -561,13 +583,16 @@ class kra:
root = etree.Element("KeyArchivalRequest")
client_id_element = etree.SubElement(root, "clientId")
client_id_element.text = client_id
- wrapped_private_data_element = etree.SubElement(root, "wrappedPrivateData")
+ wrapped_private_data_element = etree.SubElement(
+ root,
+ "wrappedPrivateData")
wrapped_private_data_element.text = security_data
data_type_element = etree.SubElement(root, "dataType")
data_type_element.text = data_type
return etree.ElementTree(root)
- def create_recovery_request(self, key_id, request_id, session_key, passphrase, nonce=None):
+ def create_recovery_request(
+ self, key_id, request_id, session_key, passphrase, nonce=None):
"""
:param key_id: identifier of key to be recovered
:param request_id: id for the recovery request
@@ -575,22 +600,26 @@ class kra:
:param passphrase passphrase wrapped in session key
:return doc: xml doc with archival request
- """
+ """
self.debug('%s.create_recovery_request()', self.fullname)
root = etree.Element("KeyRecoveryRequest")
- if key_id != None:
+ if key_id is not None:
key_id_element = etree.SubElement(root, "keyId")
key_id_element.text = key_id
- if request_id != None:
+ if request_id is not None:
request_id_element = etree.SubElement(root, "requestId")
request_id_element.text = request_id
- if session_key != None:
- session_key_element = etree.SubElement(root, "transWrappedSessionKey")
+ if session_key is not None:
+ session_key_element = etree.SubElement(
+ root,
+ "transWrappedSessionKey")
session_key_element.text = session_key
- if passphrase != None:
- passphrase_element = etree.SubElement(root, "sessionWrappedPassphrase")
+ if passphrase is not None:
+ passphrase_element = etree.SubElement(
+ root,
+ "sessionWrappedPassphrase")
passphrase_element.text = passphrase
- if nonce != None:
+ if nonce is not None:
nonce_element = etree.SubElement(root, "nonceData")
nonce_element.text = nonce
return etree.ElementTree(root)
@@ -612,24 +641,30 @@ class kra:
self.debug('%s.archive_security_data()', self.fullname)
# check clientID and security data
- if ((client_id == None) or (security_data == None)):
- raise CertificateOperationError(error=_('Bad arguments to archive_security_data'))
+ if ((client_id is None) or (security_data is None)):
+ raise CertificateOperationError(
+ error=_('Bad arguments to archive_security_data'))
- request = self.create_archival_request(client_id, security_data, data_type)
+ request = self.create_archival_request(
+ client_id,
+ security_data,
+ data_type)
# Call CMS
http_status, http_reason_phrase, _http_headers, http_body = \
self._request('/kra/rest/agent/keyrequests/archive',
- self.kra_agent_port,
- self.POST,
- etree.tostring(request.getroot(), encoding='UTF-8'))
+ self.kra_agent_port,
+ self.POST,
+ etree.tostring(request.getroot(), encoding='UTF-8'))
# Parse and handle errors
if (http_status != 200):
- raise CertificateOperationError(error=_('Error in archiving request (%s)') % \
- http_reason_phrase)
+ raise CertificateOperationError(error=_('Error in archiving request (%s)') %
+ http_reason_phrase)
- parse_result = self.get_parse_result_xml(http_body, parse_key_request_info_xml)
+ parse_result = self.get_parse_result_xml(
+ http_body,
+ parse_key_request_info_xml)
return parse_result
def get_transport_cert(self, etag=None):
@@ -645,17 +680,19 @@ class kra:
# Call CMS
http_status, http_reason_phrase, http_headers, http_body = \
self._request('/kra/rest/config/cert/transport',
- self.kra_agent_port,
- self.GET,
- None)
+ self.kra_agent_port,
+ self.GET,
+ None)
- self.debug("headers: %s" , http_headers)
+ self.debug("headers: %s", http_headers)
# Parse and handle errors
if (http_status != 200):
- raise CertificateOperationError(error=_('Error in archiving request (%s)') % \
- http_reason_phrase)
+ raise CertificateOperationError(error=_('Error in archiving request (%s)') %
+ http_reason_phrase)
- parse_result = self.get_parse_result_xml(http_body, parse_certificate_data_xml)
+ parse_result = self.get_parse_result_xml(
+ http_body,
+ parse_certificate_data_xml)
return parse_result
def list_security_data(self, client_id, key_state=None, next_id=None):
@@ -669,14 +706,15 @@ class kra:
The command returns a dict as specified in parse_key_data_infos_xml().
"""
self.debug('%s.list_security_data()', self.fullname)
- if client_id == None:
- raise CertificateOperationError(error=_('Bad argument to list_security_data'))
+ if client_id is None:
+ raise CertificateOperationError(
+ error=_('Bad argument to list_security_data'))
get_args = "clientID=" + quote_plus(client_id)
- if key_state != None:
+ if key_state is not None:
get_args = get_args + "&status=" + quote_plus(key_state)
- if next_id != None:
+ if next_id is not None:
# currnently not implemented on server
get_args = get_args + "&start=" + quote_plus(next_id)
@@ -689,10 +727,12 @@ class kra:
# Parse and handle errors
if (http_status != 200):
- raise CertificateOperationError(error=_('Error in listing keys (%s)') % \
- http_reason_phrase)
+ raise CertificateOperationError(error=_('Error in listing keys (%s)') %
+ http_reason_phrase)
- parse_result = self.get_parse_result_xml(http_body, parse_key_data_infos_xml)
+ parse_result = self.get_parse_result_xml(
+ http_body,
+ parse_key_data_infos_xml)
return parse_result
def list_key_requests(self, request_state=None, request_type=None, client_id=None,
@@ -709,16 +749,16 @@ class kra:
self.debug('%s.list_key_requests()', self.fullname)
get_args = ""
- if request_state != None:
+ if request_state is not None:
get_args = get_args + "&requestState=" + quote_plus(request_state)
- if request_type != None:
+ if request_type is not None:
get_args = get_args + "&requestType=" + quote_plus(request_type)
- if client_id != None:
+ if client_id is not None:
get_args = get_args + "&clientID=" + quote_plus(client_id)
- if next_id != None:
+ if next_id is not None:
# currnently not implemented on server
get_args = get_args + "&start=" + quote_plus(next_id)
@@ -731,10 +771,12 @@ class kra:
# Parse and handle errors
if (http_status != 200):
- raise CertificateOperationError(error=_('Error in listing key requests (%s)') % \
- http_reason_phrase)
+ raise CertificateOperationError(error=_('Error in listing key requests (%s)') %
+ http_reason_phrase)
- parse_result = self.get_parse_result_xml(http_body, parse_key_request_infos_xml)
+ parse_result = self.get_parse_result_xml(
+ http_body,
+ parse_key_request_infos_xml)
return parse_result
def submit_recovery_request(self, key_id):
@@ -750,24 +792,27 @@ class kra:
self.debug('%s.submit_recovery_request()', self.fullname)
# check clientID and security data
- if key_id == None:
- raise CertificateOperationError(error=_('Bad argument to archive_security_data'))
+ if key_id is None:
+ raise CertificateOperationError(
+ error=_('Bad argument to archive_security_data'))
request = self.create_recovery_request(key_id, None, None, None)
# Call CMS
http_status, http_reason_phrase, _http_headers, http_body = \
self._request('/kra/rest/agent/keyrequests/recover',
- self.kra_agent_port,
- self.POST,
- etree.tostring(request.getroot(), encoding='UTF-8'))
+ self.kra_agent_port,
+ self.POST,
+ etree.tostring(request.getroot(), encoding='UTF-8'))
# Parse and handle errors
if (http_status != 200):
- raise CertificateOperationError(error=_('Error in archiving request (%s)') % \
- http_reason_phrase)
+ raise CertificateOperationError(error=_('Error in archiving request (%s)') %
+ http_reason_phrase)
- parse_result = self.get_parse_result_xml(http_body, parse_key_request_info_xml)
+ parse_result = self.get_parse_result_xml(
+ http_body,
+ parse_key_request_info_xml)
return parse_result
def check_request_status(self, request_id):
@@ -800,20 +845,21 @@ class kra:
Approve recovery request
"""
self.debug('%s.approve_recovery_request()', self.fullname)
- if request_id == None:
- raise CertificateOperationError(error=_('Bad argument to approve_recovery_request'))
+ if request_id is None:
+ raise CertificateOperationError(
+ error=_('Bad argument to approve_recovery_request'))
# Call CMS
http_status, http_reason_phrase, _http_headers, _http_body = \
self._request('/kra/rest/agent/keyrequests/' + request_id + '/approve',
- self.kra_agent_port,
- self.POST,
- None)
+ self.kra_agent_port,
+ self.POST,
+ None)
# Parse and handle errors
if (http_status > 399):
- raise CertificateOperationError(error=_('Error in approving request (%s)') % \
- http_reason_phrase)
+ raise CertificateOperationError(error=_('Error in approving request (%s)') %
+ http_reason_phrase)
def reject_recovery_request(self, request_id):
"""
@@ -822,20 +868,21 @@ class kra:
Reject recovery request
"""
self.debug('%s.reject_recovery_request()', self.fullname)
- if request_id == None:
- raise CertificateOperationError(error=_('Bad argument to reject_recovery_request'))
+ if request_id is None:
+ raise CertificateOperationError(
+ error=_('Bad argument to reject_recovery_request'))
# Call CMS
http_status, http_reason_phrase, _http_headers, _http_body = \
self._request('/kra/rest/agent/keyrequests/' + request_id + '/reject',
- self.kra_agent_port,
- self.POST,
- None)
+ self.kra_agent_port,
+ self.POST,
+ None)
# Parse and handle errors
if (http_status > 399):
- raise CertificateOperationError(error=_('Error in rejecting request (%s)') % \
- http_reason_phrase)
+ raise CertificateOperationError(error=_('Error in rejecting request (%s)') %
+ http_reason_phrase)
def cancel_recovery_request(self, request_id):
"""
@@ -844,20 +891,21 @@ class kra:
Cancel recovery request
"""
self.debug('%s.cancel_recovery_request()', self.fullname)
- if request_id == None:
- raise CertificateOperationError(error=_('Bad argument to cancel_recovery_request'))
+ if request_id is None:
+ raise CertificateOperationError(
+ error=_('Bad argument to cancel_recovery_request'))
# Call CMS
http_status, http_reason_phrase, _http_headers, _http_body = \
self._request('/kra/rest/agent/keyrequests/' + request_id + '/cancel',
- self.kra_agent_port,
- self.POST,
- None)
+ self.kra_agent_port,
+ self.POST,
+ None)
# Parse and handle errors
if (http_status > 399):
- raise CertificateOperationError(error=_('Error in cancelling request (%s)') % \
- http_reason_phrase)
+ raise CertificateOperationError(error=_('Error in cancelling request (%s)') %
+ http_reason_phrase)
def retrieve_security_data(self, recovery_request_id, passphrase=None):
"""
@@ -881,23 +929,35 @@ class kra:
|data |String | Key data (either wrapped using |
| | | passphrase or unwrapped) |
+-----------------+---------------+---------------------------------------+
- """
+ """
self.debug('%s.retrieve_security_data()', self.fullname)
- if recovery_request_id == None:
- raise CertificateOperationError(error=_('Bad arguments to retrieve_security_data'))
+ if recovery_request_id is None:
+ raise CertificateOperationError(
+ error=_('Bad arguments to retrieve_security_data'))
# generate symmetric key
slot = nss.get_best_slot(self.mechanism)
- session_key = slot.key_gen(self.mechanism, None, slot.get_best_key_length(self.mechanism))
+ session_key = slot.key_gen(
+ self.mechanism,
+ None,
+ slot.get_best_key_length(
+ self.mechanism))
# wrap this key with the transport cert
public_key = self.transport_cert.subject_public_key_info.public_key
- wrapped_session_key = base64.b64encode(nss.pub_wrap_sym_key(self.mechanism, public_key, session_key))
+ wrapped_session_key = base64.b64encode(
+ nss.pub_wrap_sym_key(
+ self.mechanism,
+ public_key,
+ session_key))
wrapped_passphrase = None
- if passphrase != None:
+ if passphrase is not None:
# wrap passphrase with session key
- wrapped_session_key = base64.b64encode(self.symmetric_wrap(passphrase, session_key))
+ wrapped_session_key = base64.b64encode(
+ self.symmetric_wrap(
+ passphrase,
+ session_key))
request = self.create_recovery_request(None, recovery_request_id,
wrapped_session_key,
@@ -906,21 +966,24 @@ class kra:
# Call CMS
http_status, http_reason_phrase, _http_headers, http_body = \
self._request('/kra/rest/agent/keys/retrieve',
- self.kra_agent_port,
- self.POST,
- etree.tostring(request.getroot(), encoding='UTF-8'))
+ self.kra_agent_port,
+ self.POST,
+ etree.tostring(request.getroot(), encoding='UTF-8'))
# Parse and handle errors
if (http_status != 200):
- raise CertificateOperationError(error=_('Error in retrieving security data (%s)') % \
- http_reason_phrase)
+ raise CertificateOperationError(error=_('Error in retrieving security data (%s)') %
+ http_reason_phrase)
parse_result = self.get_parse_result_xml(http_body, parse_key_data_xml)
- if passphrase == None:
- iv = nss.data_to_hex(base64.decodestring(parse_result['nonce_data']))
- parse_result['data'] = self.symmetric_unwrap(base64.decodestring(parse_result['wrapped_data']),
- session_key, iv)
+ if passphrase is None:
+ iv = nss.data_to_hex(
+ base64.decodestring(
+ parse_result['nonce_data']))
+ parse_result['data'] = self.symmetric_unwrap(
+ base64.decodestring(parse_result['wrapped_data']),
+ session_key, iv)
return parse_result
@@ -945,13 +1008,26 @@ class kra:
import argparse
parser = argparse.ArgumentParser(description="Sample Test execution")
-parser.add_argument('-d', default='/tmp/drmtest', dest='work_dir', help='Working directory')
+parser.add_argument(
+ '-d',
+ default='/tmp/drmtest',
+ dest='work_dir',
+ help='Working directory')
parser.add_argument('--options', default='options.out', dest='options_file',
help='File containing test PKIArchiveOptions to be archived')
parser.add_argument('--symkey', default='symkey.out', dest='symkey_file',
help='File containing test symkey')
-parser.add_argument('--host', default='localhost', dest='kra_host', help='DRM hostname')
-parser.add_argument('-p', default='10080', type=int, dest='kra_port', help='DRM Port')
+parser.add_argument(
+ '--host',
+ default='localhost',
+ dest='kra_host',
+ help='DRM hostname')
+parser.add_argument(
+ '-p',
+ default='10080',
+ type=int,
+ dest='kra_port',
+ help='DRM Port')
parser.add_argument('-n', default='DRM TransportCert Nickname', dest='kra_nickname',
help="DRM Nickname")
@@ -963,7 +1039,7 @@ kra_nickname = args.kra_nickname
options_file = args.options_file
symkey_file = args.symkey_file
-test_kra = kra(work_dir, kra_host, kra_port, kra_nickname)
+test_kra = KRA(work_dir, kra_host, kra_port, kra_nickname)
# list requests
requests = test_kra.list_key_requests()
@@ -977,7 +1053,10 @@ print transport_cert
f = open(work_dir + "/" + options_file)
wrapped_key = f.read()
client_id = "Python symmetric key " + datetime.now().strftime("%Y-%m-%d %H:%M")
-response = test_kra.archive_security_data(client_id, wrapped_key, "symmetricKey")
+response = test_kra.archive_security_data(
+ client_id,
+ wrapped_key,
+ "symmetricKey")
print response
# list keys with client_id
@@ -999,7 +1078,7 @@ print "Testing invalid request ID"
try:
response = test_kra.retrieve_security_data("INVALID")
print "Failure: No exception thrown"
-except CertificateOperationError, e:
+except CertificateOperationError as e:
if 'Error in retrieving security data (Bad Request)' == e.error:
print "Success: " + e.error
else:
diff --git a/base/kra/functional/drmtest.py b/base/kra/functional/drmtest.py
index 0a7bbeaae..d951d5f25 100755
--- a/base/kra/functional/drmtest.py
+++ b/base/kra/functional/drmtest.py
@@ -34,7 +34,6 @@ See drmtest.readme.txt.
import base64
import getopt
-import os
import random
import shutil
import string
@@ -84,14 +83,15 @@ def print_key_data(key_data):
print "Private Data: " + base64.encodestring(key_data.data)
-def run_test(protocol, hostname, port, client_cert, certdb_dir, certdb_password):
+def run_test(protocol, hostname, port, client_cert, certdb_dir,
+ certdb_password):
""" test code execution """
# set up the connection to the DRM, including authentication credentials
connection = PKIConnection(protocol, hostname, port, 'kra')
connection.set_authentication_cert(client_cert)
- #create kraclient
+ # create kraclient
crypto = pki.crypto.NSSCryptoProvider(certdb_dir, certdb_password)
kraclient = KRAClient(connection, crypto)
keyclient = kraclient.keys
@@ -207,7 +207,7 @@ def run_test(protocol, hostname, port, client_cert, certdb_dir, certdb_password)
print "KeyNotFoundException thrown - Code:" + exc.code + \
" Message: " + exc.message
- #Test 13 = getKeyInfo
+ # Test 13 = getKeyInfo
print "Get key info for existing key"
key_info = keyclient.get_key_info(key_id)
print_key_info(key_info)
@@ -217,7 +217,7 @@ def run_test(protocol, hostname, port, client_cert, certdb_dir, certdb_password)
key_info = keyclient.get_active_key_info(client_key_id)
print_key_info(key_info)
- #Test 15: change the key status
+ # Test 15: change the key status
print "Change the key status"
keyclient.modify_key_status(key_id, keyclient.KEY_STATUS_INACTIVE)
print_key_info(keyclient.get_key_info(key_id))
@@ -239,7 +239,7 @@ def run_test(protocol, hostname, port, client_cert, certdb_dir, certdb_password)
print "ResourceNotFoundException thrown - Code: " + exc.code +\
"Message: " + exc.message
- #Test 18: Generate a symmetric key with default parameters
+ # Test 18: Generate a symmetric key with default parameters
client_key_id = "Vek #3" + time.strftime('%c')
response = keyclient.generate_symmetric_key(client_key_id)
print_key_request(response.request_info)
@@ -270,7 +270,7 @@ def run_test(protocol, hostname, port, client_cert, certdb_dir, certdb_password)
print "Error: archived and recovered keys do not match"
print
- #Test 20: Generating asymmetric keys
+ # Test 20: Generating asymmetric keys
print "Generating asymmetric keys"
try:
response = keyclient.generate_asymmetric_key(
@@ -284,7 +284,7 @@ def run_test(protocol, hostname, port, client_cert, certdb_dir, certdb_password)
print "BadRequestException thrown - Code:" + exc.code +\
" Message: " + exc.message
- #Test 21: Get key information of the newly generated asymmetric keys
+ # Test 21: Get key information of the newly generated asymmetric keys
print "Retrieving key information"
key_info = keyclient.get_key_info(response.request_info.get_key_id())
print_key_info(key_info)
@@ -296,7 +296,7 @@ def usage():
print ' -P <protocol> KRA server protocol (default: https).'
print ' -h <hostname> KRA server hostname (default: localhost).'
print ' -p <port> KRA server port (default: 8443).'
- print ' -n <path> KRA agent certificate and private key (default: kraagent.pem).'
+ print ' -n <path> KRA agent certificate and private key (default: kraagent.pem).' # nopep8
print
print ' --help Show this help message.'
@@ -310,9 +310,9 @@ def main(argv):
usage()
sys.exit(1)
- protocol = 'https'
- hostname = 'localhost'
- port = '8443'
+ protocol = 'https'
+ hostname = 'localhost'
+ port = '8443'
client_cert = 'kraagent.pem'
for o, a in opts:
@@ -340,11 +340,20 @@ def main(argv):
certdb_dir = tempfile.mkdtemp(prefix='pki-kra-test-')
print "NSS database dir: %s" % certdb_dir
- certdb_password = ''.join(random.choice(string.ascii_letters + string.digits) for i in range(8))
+ certdb_password = ''.join(
+ random.choice(
+ string.ascii_letters +
+ string.digits) for i in range(8))
print "NSS database password: %s" % certdb_password
try:
- run_test(protocol, hostname, port, client_cert, certdb_dir, certdb_password)
+ run_test(
+ protocol,
+ hostname,
+ port,
+ client_cert,
+ certdb_dir,
+ certdb_password)
finally:
shutil.rmtree(certdb_dir)