From 7a131df18b4cbaa78929df1a1419f25898489815 Mon Sep 17 00:00:00 2001 From: Simo Sorce Date: Sun, 8 Mar 2015 17:26:04 -0400 Subject: Add more checks, algorithms, tests --- jwcrypto/jwe.py | 181 +++++++++++++++++++++++++++++++++++++++++++++++++++--- jwcrypto/jwk.py | 4 ++ jwcrypto/tests.py | 149 ++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 321 insertions(+), 13 deletions(-) diff --git a/jwcrypto/jwe.py b/jwcrypto/jwe.py index 6bea310..40f8edb 100644 --- a/jwcrypto/jwe.py +++ b/jwcrypto/jwe.py @@ -31,6 +31,22 @@ JWEHeaderRegistry = {'alg': ('Algorithm', True), 'crit': ('Critical', True)} +# Note: l is the number of bits, which should be a multiple of 16 +def encode_int(n, l): + e = hex(n).rstrip("L").lstrip("0x") + el = len(e) + L = ((l + 7) // 8) * 2 # number of bytes rounded up times 2 chars/bytes + if el > L: + e = e[:L] + else: + e = '0' * (L - el) + e # pad as necessary + return e.decode('hex') + + +def decode_int(n): + return int(n.encode('hex'), 16) + + class InvalidJWEData(Exception): def __init__(self, message=None, exception=None): msg = None @@ -61,6 +77,12 @@ class InvalidJWEOperation(Exception): super(InvalidJWEOperation, self).__init__(msg) +class InvalidJWEKeyType(Exception): + def __init__(self, expected, obtained): + msg = 'Expected key type %s, got %s' % (expected, obtained) + super(InvalidJWEKeyType, self).__init__(msg) + + class _raw_key_mgmt(object): def wrap(self, key, keylen, cek): @@ -75,7 +97,13 @@ class _rsa(_raw_key_mgmt): def __init__(self, padfn): self.padfn = padfn + def check_key(self, key): + if key.key_type != 'RSA': + raise InvalidJWEKeyType('RSA', key.key_type) + + # FIXME: get key size and insure > 2048 bits def wrap(self, key, keylen, cek): + self.check_key(key) if not cek: cek = os.urandom(keylen) rk = key.encrypt_key() @@ -83,14 +111,82 @@ class _rsa(_raw_key_mgmt): return (cek, ek) def unwrap(self, key, ek): + self.check_key(key) rk = key.decrypt_key() cek = rk.decrypt(ek, self.padfn) return cek +class _aes_kw(_raw_key_mgmt): + + def __init__(self, keysize): + self.backend = default_backend() + self.keysize = keysize + + def check_key(self, key): + if key.key_type != 'oct': + raise InvalidJWEKeyType('oct', key.key_type) + + def wrap(self, key, keylen, cek): + self.check_key(key) + if not cek: + cek = os.urandom(keylen) + rk = base64url_decode(key.encrypt_key()) + + # Implement RFC 3994 Key Unwrap - 2.2.2 + # TODO: Use cryptography once issue #1733 is resolved + iv = 'a6a6a6a6a6a6a6a6' + A = iv.decode('hex') + R = [cek[i:i+8] for i in range(0, len(cek), 8)] + n = len(R) + for j in range(0, 6): + for i in range(0, n): + e = Cipher(algorithms.AES(rk), modes.ECB(), + backend=self.backend).encryptor() + B = e.update(A + R[i]) + e.finalize() + A = encode_int(decode_int(B[:8]) ^ ((n*j)+i+1), 64) + R[i] = B[-8:] + ek = A + for i in range(0, n): + ek += R[i] + return (cek, ek) + + def unwrap(self, key, ek): + self.check_key(key) + rk = base64url_decode(key.decrypt_key()) + + # Implement RFC 3994 Key Unwrap - 2.2.3 + # TODO: Use cryptography once issue #1733 is resolved + iv = 'a6a6a6a6a6a6a6a6' + Aiv = iv.decode('hex') + + R = [ek[i:i+8] for i in range(0, len(ek), 8)] + A = R.pop(0) + n = len(R) + for j in range(5, -1, -1): + for i in range(n - 1, -1, -1): + AtR = encode_int((decode_int(A) ^ ((n*j)+i+1)), 64) + R[i] + d = Cipher(algorithms.AES(rk), modes.ECB(), + backend=self.backend).decryptor() + B = d.update(AtR) + d.finalize() + A = B[:8] + R[i] = B[-8:] + + if A != Aiv: + raise InvalidJWEData('Decryption Failed') + + cek = ''.join(R) + return cek + + class _direct(_raw_key_mgmt): + def check_key(self, key): + if key.key_type != 'oct': + raise InvalidJWEKeyType('oct', key.key_type) + def wrap(self, key, keylen, cek): + self.check_key(key) if cek: return (cek, None) k = base64url_decode(key.encrypt_key()) @@ -99,6 +195,7 @@ class _direct(_raw_key_mgmt): return (k, '') def unwrap(self, key, ek): + self.check_key(key) if ek != '': raise InvalidJWEData('Invalid Encryption Key.') return base64url_decode(key.decrypt_key()) @@ -106,12 +203,6 @@ class _direct(_raw_key_mgmt): class _raw_jwe(object): - def encode_int(self, n, l): - e = hex(n).rstrip("L").lstrip("0x") - L = (l + 7) / 8 # number of bytes rounded up - e = '0' * (L * 2 - len(e)) + e # pad as necessary - return e.decode('hex') - def encrypt(self, k, a, m): raise NotImplementedError @@ -131,7 +222,7 @@ class _aes_cbc_hmac_sha2(_raw_jwe): return self.blocksize * 2 def _mac(self, k, a, iv, e): - al = self.encode_int(len(a * 8), 64) + al = encode_int(len(a * 8), 64) h = hmac.HMAC(k, self.hashfn, backend=self.backend) h.update(a) h.update(iv) @@ -195,6 +286,54 @@ class _aes_cbc_hmac_sha2(_raw_jwe): return unpadder.update(d) + unpadder.finalize() +class _aes_gcm(_raw_jwe): + + def __init__(self, keybits): + self.backend = default_backend() + self.blocksize = keybits / 8 + + @property + def key_size(self): + return self.blocksize + + # draft-ietf-jose-json-web-algorithms-40 - 5.2.2 + def encrypt(self, k, a, m): + """ Encrypt accoriding to the selected encryption and hashing + functions. + + :param k: Encryption key (optional) + :param a: Additional Authentication Data + :param m: Plaintext + + Returns a dictionary with the computed data. + """ + iv = os.urandom(96 / 8) + cipher = Cipher(algorithms.AES(k), modes.GCM(iv), + backend=self.backend) + encryptor = cipher.encryptor() + encryptor.authenticate_additional_data(a) + e = encryptor.update(m) + encryptor.finalize() + + return (iv, e, encryptor.tag) + + def decrypt(self, k, a, iv, e, t): + """ Decrypt accoriding to the selected encryption and hashing + functions. + :param k: Encryption key (optional) + :param a: Additional Authenticated Data + :param iv: Initialization Vector + :param e: Ciphertext + :param t: Authentication Tag + + Returns plaintext or raises an error + """ + cipher = Cipher(algorithms.AES(k), modes.GCM(iv, t), + backend=self.backend) + decryptor = cipher.decryptor() + decryptor.authenticate_additional_data(a) + return decryptor.update(e) + decryptor.finalize() + + class JWE(object): def __init__(self, plaintext=None, protected=None, unprotected=None, @@ -224,6 +363,19 @@ class JWE(object): def _jwa_RSA1_5(self): return _rsa(padding.PKCS1v15()) + def _jwa_RSA_OAEP(self): + return _rsa(padding.OAEP(padding.MGF1(hashes.SHA1()), + hashes.SHA1(), + None)) + + def _jwa_RSA_OAEP_256(self): + return _rsa(padding.OAEP(padding.MGF1(hashes.SHA256()), + hashes.SHA256(), + None)) + + def _jwa_A128KW(self): + return _aes_kw(128) + def _jwa_dir(self): return _direct() @@ -231,6 +383,21 @@ class JWE(object): def _jwa_A128CBC_HS256(self): return _aes_cbc_hmac_sha2(hashes.SHA256(), 128) + def _jwa_A192CBC_HS384(self): + return _aes_cbc_hmac_sha2(hashes.SHA384(), 192) + + def _jwa_A256CBC_HS512(self): + return _aes_cbc_hmac_sha2(hashes.SHA512(), 256) + + def _jwa_A128GCM(self): + return _aes_gcm(128) + + def _jwa_A192GCM(self): + return _aes_gcm(192) + + def _jwa_A256GCM(self): + return _aes_gcm(256) + def _jwa(self, name): attr = '_jwa_%s' % name.replace('-', '_').replace('+', '_') try: diff --git a/jwcrypto/jwk.py b/jwcrypto/jwk.py index f689930..8583ff1 100644 --- a/jwcrypto/jwk.py +++ b/jwcrypto/jwk.py @@ -154,6 +154,10 @@ class JWK(object): d.update(self._key) return json.dumps(d) + @property + def key_type(self): + return self._params.get('kty', None) + @property def key_id(self): return self._params.get('kid', None) diff --git a/jwcrypto/tests.py b/jwcrypto/tests.py index 181ca6f..e17a295 100644 --- a/jwcrypto/tests.py +++ b/jwcrypto/tests.py @@ -437,6 +437,61 @@ class TestJWS(unittest.TestCase): self.assertEqual(False, S.objects['valid']) +E_A1_plaintext = \ + [84, 104, 101, 32, 116, 114, 117, 101, 32, 115, 105, 103, 110, 32, + 111, 102, 32, 105, 110, 116, 101, 108, 108, 105, 103, 101, 110, 99, + 101, 32, 105, 115, 32, 110, 111, 116, 32, 107, 110, 111, 119, 108, + 101, 100, 103, 101, 32, 98, 117, 116, 32, 105, 109, 97, 103, 105, + 110, 97, 116, 105, 111, 110, 46] +E_A1_protected = "eyJhbGciOiJSU0EtT0FFUCIsImVuYyI6IkEyNTZHQ00ifQ" +E_A1_key = \ + {"kty": "RSA", + "n": "oahUIoWw0K0usKNuOR6H4wkf4oBUXHTxRvgb48E-BVvxkeDNjbC4he8rUW" + "cJoZmds2h7M70imEVhRU5djINXtqllXI4DFqcI1DgjT9LewND8MW2Krf3S" + "psk_ZkoFnilakGygTwpZ3uesH-PFABNIUYpOiN15dsQRkgr0vEhxN92i2a" + "sbOenSZeyaxziK72UwxrrKoExv6kc5twXTq4h-QChLOln0_mtUZwfsRaMS" + "tPs6mS6XrgxnxbWhojf663tuEQueGC-FCMfra36C9knDFGzKsNa7LZK2dj" + "YgyD3JR_MB_4NUJW_TqOQtwHYbxevoJArm-L5StowjzGy-_bq6Gw", + "e": "AQAB", + "d": "kLdtIj6GbDks_ApCSTYQtelcNttlKiOyPzMrXHeI-yk1F7-kpDxY4-WY5N" + "WV5KntaEeXS1j82E375xxhWMHXyvjYecPT9fpwR_M9gV8n9Hrh2anTpTD9" + "3Dt62ypW3yDsJzBnTnrYu1iwWRgBKrEYY46qAZIrA2xAwnm2X7uGR1hghk" + "qDp0Vqj3kbSCz1XyfCs6_LehBwtxHIyh8Ripy40p24moOAbgxVw3rxT_vl" + "t3UVe4WO3JkJOzlpUf-KTVI2Ptgm-dARxTEtE-id-4OJr0h-K-VFs3VSnd" + "VTIznSxfyrj8ILL6MG_Uv8YAu7VILSB3lOW085-4qE3DzgrTjgyQ", + "p": "1r52Xk46c-LsfB5P442p7atdPUrxQSy4mti_tZI3Mgf2EuFVbUoDBvaRQ-" + "SWxkbkmoEzL7JXroSBjSrK3YIQgYdMgyAEPTPjXv_hI2_1eTSPVZfzL0lf" + "fNn03IXqWF5MDFuoUYE0hzb2vhrlN_rKrbfDIwUbTrjjgieRbwC6Cl0", + "q": "wLb35x7hmQWZsWJmB_vle87ihgZ19S8lBEROLIsZG4ayZVe9Hi9gDVCOBm" + "UDdaDYVTSNx_8Fyw1YYa9XGrGnDew00J28cRUoeBB_jKI1oma0Orv1T9aX" + "IWxKwd4gvxFImOWr3QRL9KEBRzk2RatUBnmDZJTIAfwTs0g68UZHvtc", + "dp": "ZK-YwE7diUh0qR1tR7w8WHtolDx3MZ_OTowiFvgfeQ3SiresXjm9gZ5KL" + "hMXvo-uz-KUJWDxS5pFQ_M0evdo1dKiRTjVw_x4NyqyXPM5nULPkcpU827" + "rnpZzAJKpdhWAgqrXGKAECQH0Xt4taznjnd_zVpAmZZq60WPMBMfKcuE", + "dq": "Dq0gfgJ1DdFGXiLvQEZnuKEN0UUmsJBxkjydc3j4ZYdBiMRAy86x0vHCj" + "ywcMlYYg4yoC4YZa9hNVcsjqA3FeiL19rk8g6Qn29Tt0cj8qqyFpz9vNDB" + "UfCAiJVeESOjJDZPYHdHY8v1b-o-Z2X5tvLx-TCekf7oxyeKDUqKWjis", + "qi": "VIMpMYbPf47dT1w_zDUXfPimsSegnMOA1zTaX7aGk_8urY6R8-ZW1FxU7" + "AlWAyLWybqq6t16VFd7hQd0y6flUK4SlOydB61gwanOsXGOAOv82cHq0E3" + "eL4HrtZkUuKvnPrMnsUUFlfUdybVzxyjz9JF_XyaY14ardLSjf4L_FNY"} +E_A1_vector = \ + "eyJhbGciOiJSU0EtT0FFUCIsImVuYyI6IkEyNTZHQ00ifQ." \ + "OKOawDo13gRp2ojaHV7LFpZcgV7T6DVZKTyKOMTYUmKoTCVJRgckCL9kiMT03JGe" \ + "ipsEdY3mx_etLbbWSrFr05kLzcSr4qKAq7YN7e9jwQRb23nfa6c9d-StnImGyFDb" \ + "Sv04uVuxIp5Zms1gNxKKK2Da14B8S4rzVRltdYwam_lDp5XnZAYpQdb76FdIKLaV" \ + "mqgfwX7XWRxv2322i-vDxRfqNzo_tETKzpVLzfiwQyeyPGLBIO56YJ7eObdv0je8" \ + "1860ppamavo35UgoRdbYaBcoh9QcfylQr66oc6vFWXRcZ_ZT2LawVCWTIy3brGPi" \ + "6UklfCpIMfIjf7iGdXKHzg." \ + "48V1_ALb6US04U3b." \ + "5eym8TW_c8SuK0ltJ3rpYIzOeDQz7TALvtu6UG9oMo4vpzs9tX_EFShS8iB7j6ji" \ + "SdiwkIr3ajwQzaBtQD_A." \ + "XFBoMYUZodetZdvTiFvSkQ" + +E_A1_ex = {'key': jwk.JWK(**E_A1_key), # pylint: disable=star-args + 'protected': base64url_decode(E_A1_protected), + 'plaintext': E_A1_plaintext, + 'vector': E_A1_vector} + E_A2_plaintext = "Live long and prosper." E_A2_protected = "eyJhbGciOiJSU0ExXzUiLCJlbmMiOiJBMTI4Q0JDLUhTMjU2In0" E_A2_key = \ @@ -486,14 +541,96 @@ E_A2_ex = {'key': jwk.JWK(**E_A2_key), # pylint: disable=star-args 'plaintext': E_A2_plaintext, 'vector': E_A2_vector} +E_A3_plaintext = "Live long and prosper." +E_A3_protected = "eyJhbGciOiJBMTI4S1ciLCJlbmMiOiJBMTI4Q0JDLUhTMjU2In0" +E_A3_key = {"kty": "oct", "k": "GawgguFyGrWKav7AX4VKUg"} +E_A3_vector = \ + "eyJhbGciOiJBMTI4S1ciLCJlbmMiOiJBMTI4Q0JDLUhTMjU2In0." \ + "6KB707dM9YTIgHtLvtgWQ8mKwboJW3of9locizkDTHzBC2IlrT1oOQ." \ + "AxY8DCtDaGlsbGljb3RoZQ." \ + "KDlTtXchhZTGufMYmOYGS4HffxPSUrfmqCHXaI9wOGY." \ + "U0m_YmjN04DJvceFICbCVQ" + +E_A3_ex = {'key': jwk.JWK(**E_A3_key), # pylint: disable=star-args + 'protected': base64url_decode(E_A3_protected), + 'plaintext': E_A3_plaintext, + 'vector': E_A3_vector} + +E_A4_protected = "eyJlbmMiOiJBMTI4Q0JDLUhTMjU2In0" +E_A4_unprotected = {"jku": "https://server.example.com/keys.jwks"} +E_A4_vector = \ + '{"protected":"eyJlbmMiOiJBMTI4Q0JDLUhTMjU2In0",' \ + '"unprotected":{"jku":"https://server.example.com/keys.jwks"},' \ + '"recipients":[' \ + '{"header":{"alg":"RSA1_5","kid":"2011-04-29"},' \ + '"encrypted_key":'\ + '"UGhIOguC7IuEvf_NPVaXsGMoLOmwvc1GyqlIKOK1nN94nHPoltGRhWhw7Zx0-' \ + 'kFm1NJn8LE9XShH59_i8J0PH5ZZyNfGy2xGdULU7sHNF6Gp2vPLgNZ__deLKx' \ + 'GHZ7PcHALUzoOegEI-8E66jX2E4zyJKx-YxzZIItRzC5hlRirb6Y5Cl_p-ko3' \ + 'YvkkysZIFNPccxRU7qve1WYPxqbb2Yw8kZqa2rMWI5ng8OtvzlV7elprCbuPh' \ + 'cCdZ6XDP0_F8rkXds2vE4X-ncOIM8hAYHHi29NX0mcKiRaD0-D-ljQTP-cFPg' \ + 'wCp6X-nZZd9OHBv-B3oWh2TbqmScqXMR4gp_A"},' \ + '{"header":{"alg":"A128KW","kid":"7"},' \ + '"encrypted_key":' \ + '"6KB707dM9YTIgHtLvtgWQ8mKwboJW3of9locizkDTHzBC2IlrT1oOQ"}],' \ + '"iv":"AxY8DCtDaGlsbGljb3RoZQ",' \ + '"ciphertext":"KDlTtXchhZTGufMYmOYGS4HffxPSUrfmqCHXaI9wOGY",' \ + '"tag":"Mz-VPPyU4RlcuYv1IwIvzw"}' + +E_A4_ex = {'key1': jwk.JWK(**E_A2_key), # pylint: disable=star-args + 'header1': '{"alg":"RSA1_5","kid":"2011-04-29"}', + 'key2': jwk.JWK(**E_A3_key), # pylint: disable=star-args + 'header2': '{"alg":"A128KW","kid":"7"}', + 'protected': base64url_decode(E_A4_protected), + 'unprotected': json.dumps(E_A4_unprotected), + 'plaintext': E_A3_plaintext, + 'vector': E_A4_vector} + +E_A5_ex = \ + '{"protected":"eyJlbmMiOiJBMTI4Q0JDLUhTMjU2In0",' \ + '"unprotected":{"jku":"https://server.example.com/keys.jwks"},' \ + '"header":{"alg":"A128KW","kid":"7"},' \ + '"encrypted_key":' \ + '"6KB707dM9YTIgHtLvtgWQ8mKwboJW3of9locizkDTHzBC2IlrT1oOQ",' \ + '"iv":"AxY8DCtDaGlsbGljb3RoZQ",' \ + '"ciphertext":"KDlTtXchhZTGufMYmOYGS4HffxPSUrfmqCHXaI9wOGY",' \ + '"tag":"Mz-VPPyU4RlcuYv1IwIvzw"}' + class TestJWE(unittest.TestCase): - def test_A2(self): - E = jwe.JWE(E_A2_ex['plaintext'], E_A2_ex['protected']) - E.add_recipient(E_A2_ex['key']) + def check_enc(self, plaintext, protected, key, vector): + E = jwe.JWE(plaintext, protected) + E.add_recipient(key) # Encrypt and serialize using compact - e = E.serialize(compact=True) + e = E.serialize() # And test that we can decrypt our own - E.deserialize(e, E_A2_ex['key']) + E.deserialize(e, key) + # Now test the Spec Test Vector + E.deserialize(vector, key) + + def test_A1(self): + self.check_enc(E_A1_ex['plaintext'], E_A1_ex['protected'], + E_A1_ex['key'], E_A1_ex['vector']) + + def test_A2(self): + self.check_enc(E_A2_ex['plaintext'], E_A2_ex['protected'], + E_A2_ex['key'], E_A2_ex['vector']) + + def test_A3(self): + self.check_enc(E_A3_ex['plaintext'], E_A3_ex['protected'], + E_A3_ex['key'], E_A3_ex['vector']) + + def test_A4(self): + E = jwe.JWE(E_A4_ex['plaintext'], E_A4_ex['protected']) + E.add_recipient(E_A4_ex['key1'], E_A4_ex['header1']) + E.add_recipient(E_A4_ex['key2'], E_A4_ex['header2']) + e = E.serialize() + E.deserialize(e, E_A4_ex['key1']) + E.deserialize(e, E_A4_ex['key2']) # Now test the Spec Test Vector - E.deserialize(E_A2_ex['vector'], E_A2_ex['key']) + E.deserialize(E_A4_ex['vector'], E_A4_ex['key1']) + E.deserialize(E_A4_ex['vector'], E_A4_ex['key2']) + + def test_A5(self): + E = jwe.JWE() + E.deserialize(E_A5_ex) -- cgit