diff options
author | Christian Heimes <cheimes@redhat.com> | 2015-09-21 14:44:50 +0200 |
---|---|---|
committer | Christian Heimes <cheimes@redhat.com> | 2015-10-01 20:02:59 +0200 |
commit | 344caf335d94e7aa3a32bdd850ed1363088c896d (patch) | |
tree | 25407b09ad8a79e59f0446c47d3debc9b3adddfa | |
parent | 4f5051463ea9dc1366a2b58b9814c0e7997c1813 (diff) | |
download | pki-344caf335d94e7aa3a32bdd850ed1363088c896d.tar.gz pki-344caf335d94e7aa3a32bdd850ed1363088c896d.tar.xz pki-344caf335d94e7aa3a32bdd850ed1363088c896d.zip |
Replace legacy Python base64 invocations with Py3-safe code
Replace deprecated decodestring() and encodestring() with b64decode()
and b64encode().
Provice specialized encode_cert() / decode_cert() functions to handle
base64 encoding and decoding for X.509 certs in JSON strings. In Python
3 the base64 function don't suppor ASCII text, just ASCII bytes.
-rw-r--r-- | base/common/python/pki/encoder.py | 30 | ||||
-rw-r--r-- | base/common/python/pki/key.py | 18 | ||||
-rw-r--r-- | base/common/python/pki/systemcert.py | 5 | ||||
-rw-r--r-- | base/kra/functional/drmclient_deprecated.py | 16 | ||||
-rwxr-xr-x | base/kra/functional/drmtest.py | 20 |
5 files changed, 62 insertions, 27 deletions
diff --git a/base/common/python/pki/encoder.py b/base/common/python/pki/encoder.py index bf5d2e473..f83060103 100644 --- a/base/common/python/pki/encoder.py +++ b/base/common/python/pki/encoder.py @@ -1,11 +1,41 @@ from __future__ import absolute_import + +import base64 import json + +import six from six import iteritems, itervalues TYPES = {} NOTYPES = {} +def encode_cert(data): + """base64 encode X.509 certificate + + Python 3's base64.b64encode() doesn't support ASCII text. + + :param data: data as bytes or ASCII text + :type data: str, bytes + :rtype: bytes + """ + if isinstance(data, six.text_type): + data = data.encode('ascii') + return base64.b64encode(data) + + +def decode_cert(data): + """base64 decode X.509 certificate + + :param data: data as bytes or ASCII text + :type data: str, bytes + :rtype: bytes + """ + if isinstance(data, six.text_type): + data = data.encode('ascii') + return base64.b64decode(data) + + class CustomTypeEncoder(json.JSONEncoder): """ A custom JSONEncoder class that knows how to encode core custom diff --git a/base/common/python/pki/key.py b/base/common/python/pki/key.py index 1204be54a..4a6f50bdd 100644 --- a/base/common/python/pki/key.py +++ b/base/common/python/pki/key.py @@ -89,9 +89,9 @@ class Key(object): def __init__(self, key_data): """ Constructor """ - self.encrypted_data = base64.decodestring( + self.encrypted_data = base64.b64decode( key_data.wrapped_private_data) - self.nonce_data = base64.decodestring(key_data.nonce_data) + self.nonce_data = base64.b64decode(key_data.nonce_data) self.algorithm = key_data.algorithm self.size = key_data.size @@ -133,7 +133,7 @@ class KeyInfo(object): else: setattr(key_info, k, v) if key_info.public_key is not None: - key_info.public_key = base64.decodestring(key_info.public_key) + key_info.public_key = encoder.decode_cert(key_info.public_key) return key_info def get_key_id(self): @@ -584,7 +584,7 @@ class KeyClient(object): raise TypeError("Must specify Client Key ID") if trans_wrapped_session_key is not None: - twsk = base64.encodestring(trans_wrapped_session_key) + twsk = base64.b64encode(trans_wrapped_session_key) # noinspection PyUnusedLocal request = SymKeyGenerationRequest( client_key_id=client_key_id, @@ -764,9 +764,9 @@ class KeyClient(object): if not nonce_iv: raise TypeError('Missing nonce IV') - data = base64.encodestring(encrypted_data) - twsk = base64.encodestring(wrapped_session_key) - symkey_params = base64.encodestring(nonce_iv) + data = base64.b64encode(encrypted_data) + twsk = base64.b64encode(wrapped_session_key) + symkey_params = base64.b64encode(nonce_iv) request = KeyArchivalRequest(client_key_id=client_key_id, data_type=data_type, @@ -806,7 +806,7 @@ class KeyClient(object): if pki_archive_options is None: raise TypeError("No data provided to be archived") - data = base64.encodestring(pki_archive_options) + data = base64.b64encode(pki_archive_options) request = KeyArchivalRequest(client_key_id=client_key_id, data_type=data_type, pki_archive_options=data, @@ -915,7 +915,7 @@ class KeyClient(object): request = KeyRecoveryRequest( key_id=key_id, request_id=request_id, - trans_wrapped_session_key=base64.encodestring( + trans_wrapped_session_key=base64.b64encode( trans_wrapped_session_key)) key = self.retrieve_key_data(request) diff --git a/base/common/python/pki/systemcert.py b/base/common/python/pki/systemcert.py index 4adc2f18e..199838b9e 100644 --- a/base/common/python/pki/systemcert.py +++ b/base/common/python/pki/systemcert.py @@ -22,9 +22,10 @@ Module containing the Python client classes for the SystemCert REST API """ from __future__ import absolute_import -import base64 + import pki from pki.cert import CertData +from pki.encoder import decode_cert class SystemCertClient(object): @@ -55,6 +56,6 @@ class SystemCertClient(object): pem = cert_data.encoded b64 = pem[len(pki.CERT_HEADER):len(pem) - len(pki.CERT_FOOTER)] - cert_data.binary = base64.decodestring(b64) + cert_data.binary = decode_cert(b64) return cert_data diff --git a/base/kra/functional/drmclient_deprecated.py b/base/kra/functional/drmclient_deprecated.py index e558073f8..6d06d7405 100644 --- a/base/kra/functional/drmclient_deprecated.py +++ b/base/kra/functional/drmclient_deprecated.py @@ -44,7 +44,6 @@ from ipalib.errors import NetworkError, CertificateOperationError from urllib import urlencode, quote_plus from datetime import datetime import logging -import base64 import six from six.moves import http_client # pylint: disable=F0401 @@ -52,6 +51,9 @@ CERT_HEADER = "-----BEGIN NEW CERTIFICATE REQUEST-----" CERT_FOOTER = "-----END NEW CERTIFICATE REQUEST-----" +from base64 import b64decode, b64encode + + def _(string): return string @@ -949,7 +951,7 @@ class KRA: # wrap this key with the transport cert public_key = self.transport_cert.subject_public_key_info.public_key - wrapped_session_key = base64.b64encode( + wrapped_session_key = b64encode( nss.pub_wrap_sym_key( self.mechanism, public_key, @@ -957,7 +959,7 @@ class KRA: wrapped_passphrase = None if passphrase is not None: # wrap passphrase with session key - wrapped_session_key = base64.b64encode( + wrapped_session_key = b64encode( self.symmetric_wrap( passphrase, session_key)) @@ -982,10 +984,10 @@ class KRA: if passphrase is None: iv = nss.data_to_hex( - base64.decodestring( + b64decode( parse_result['nonce_data'])) parse_result['data'] = self.symmetric_unwrap( - base64.decodestring(parse_result['wrapped_data']), + b64decode(parse_result['wrapped_data']), session_key, iv) return parse_result @@ -1090,14 +1092,14 @@ except CertificateOperationError as e: # retrieve key response = test_kra.retrieve_security_data(request_id) print(response) -print("retrieved data is " + base64.encodestring(response['data'])) +print("retrieved data is " + b64encode(response['data'])) # read original symkey from file f = open(work_dir + "/" + symkey_file) orig_key = f.read() print("orig key is " + orig_key) -if orig_key.strip() == base64.encodestring(response['data']).strip(): +if orig_key.strip() == b64encode(response['data']).strip(): print("Success: the keys match") else: print("Failure: keys do not match") diff --git a/base/kra/functional/drmtest.py b/base/kra/functional/drmtest.py index dd44c374b..6853987cf 100755 --- a/base/kra/functional/drmtest.py +++ b/base/kra/functional/drmtest.py @@ -34,7 +34,7 @@ See drmtest.readme.txt. from __future__ import absolute_import from __future__ import print_function -import base64 + import getopt import random import shutil @@ -42,6 +42,7 @@ import string import sys import tempfile import time +from base64 import b64decode, b64encode from six.moves import range # pylint: disable=W0622,F0401 @@ -50,6 +51,7 @@ import pki.crypto import pki.key as key from pki.client import PKIConnection +from pki.encoder import encode_cert from pki.kra import KRAClient @@ -72,7 +74,7 @@ def print_key_info(key_info): if key_info.public_key is not None: print("Public key: ") print() - pub_key = base64.encodestring(key_info.public_key) + pub_key = encode_cert(key_info.public_key) print(pub_key) @@ -80,11 +82,11 @@ def print_key_data(key_data): """ Prints the relevant fields of a KeyData object """ print("Key Algorithm: " + str(key_data.algorithm)) print("Key Size: " + str(key_data.size)) - print("Nonce Data: " + base64.encodestring(key_data.nonce_data)) + print("Nonce Data: " + b64encode(key_data.nonce_data)) print("Wrapped Private Data: " + - base64.encodestring(key_data.encrypted_data)) + b64encode(key_data.encrypted_data)) if key_data.data is not None: - print("Private Data: " + base64.encodestring(key_data.data)) + print("Private Data: " + b64encode(key_data.data)) def run_test(protocol, hostname, port, client_cert, certdb_dir, @@ -169,12 +171,12 @@ def run_test(protocol, hostname, port, client_cert, certdb_dir, unwrapped_key = crypto.symmetric_unwrap(key_data.encrypted_data, session_key, nonce_iv=key_data.nonce_data) - key1 = base64.encodestring(unwrapped_key) + key1 = b64encode(unwrapped_key) # Test 7: Recover key without providing trans_wrapped_session_key key_data = keyclient.retrieve_key(key_id) print_key_data(key_data) - key2 = base64.encodestring(key_data.data) + key2 = b64encode(key_data.data) # Test 8 - Confirm that keys returned are the same if key1 == key2: @@ -255,7 +257,7 @@ def run_test(protocol, hostname, port, client_cert, certdb_dir, response = keyclient.archive_key(client_key_id, keyclient.SYMMETRIC_KEY_TYPE, - base64.decodestring(key1), + b64decode(key1), key_algorithm=keyclient.AES_ALGORITHM, key_size=128) print_key_request(response.request_info) @@ -266,7 +268,7 @@ def run_test(protocol, hostname, port, client_cert, certdb_dir, key_data = keyclient.retrieve_key(key_info.get_key_id()) print_key_data(key_data) - key2 = base64.encodestring(key_data.data) + key2 = b64encode(key_data.data) if key1 == key2: print("Success: archived and recovered keys match") |