diff options
author | Simo Sorce <simo@redhat.com> | 2015-03-18 15:24:10 -0400 |
---|---|---|
committer | Simo Sorce <simo@redhat.com> | 2015-03-18 15:24:10 -0400 |
commit | 6352cc9d24b3ab714e210abeeb8b1d6c1b86cdad (patch) | |
tree | 389d2a92bdc54401d72b241a8927172fde8383b6 | |
parent | 9e1786f9c2700d907c7ebb99cd8e0f34822d5af7 (diff) | |
download | jwcrypto-6352cc9d24b3ab714e210abeeb8b1d6c1b86cdad.tar.gz jwcrypto-6352cc9d24b3ab714e210abeeb8b1d6c1b86cdad.tar.xz jwcrypto-6352cc9d24b3ab714e210abeeb8b1d6c1b86cdad.zip |
Fix AES blocksize handling and check keylengths
AES blocksize is always 16 regardles of key length naturally.
Fix the code to assume the proper blocksize and IV length.
Also add tests to check proper key length and add missing
A192KW and A256KW key wrapping algorythms.
Add tests to try encrypting with all AES key-length combinations.
Thanks to Jan Rusnacko for pointing out this flaw.
Signed-off-by: Simo Sorce <simo@redhat.com>
-rw-r--r-- | jwcrypto/jwe.py | 51 | ||||
-rw-r--r-- | jwcrypto/tests.py | 22 |
2 files changed, 53 insertions, 20 deletions
diff --git a/jwcrypto/jwe.py b/jwcrypto/jwe.py index b761ff9..c69d5a8 100644 --- a/jwcrypto/jwe.py +++ b/jwcrypto/jwe.py @@ -83,6 +83,12 @@ class InvalidJWEKeyType(Exception): super(InvalidJWEKeyType, self).__init__(msg) +class InvalidJWEKeyLength(Exception): + def __init__(self, expected, obtained): + msg = 'Expected key of lenght %d, got %d' % (expected, obtained) + super(InvalidJWEKeyLength, self).__init__(msg) + + class _raw_key_mgmt(object): def wrap(self, key, keylen, cek): @@ -121,17 +127,20 @@ class _aes_kw(_raw_key_mgmt): def __init__(self, keysize): self.backend = default_backend() - self.keysize = keysize + self.keysize = keysize / 8 - def check_key(self, key): + def get_key(self, key, op): if key.key_type != 'oct': raise InvalidJWEKeyType('oct', key.key_type) + rk = base64url_decode(key.get_op_key(op)) + if len(rk) != self.keysize: + raise InvalidJWEKeyLength(self.keysize * 8, len(rk) * 8) + return rk def wrap(self, key, keylen, cek): - self.check_key(key) + rk = self.get_key(key, 'encrypt') if not cek: cek = os.urandom(keylen) - rk = base64url_decode(key.get_op_key('encrypt')) # Implement RFC 3394 Key Unwrap - 2.2.2 # TODO: Use cryptography once issue #1733 is resolved @@ -152,8 +161,7 @@ class _aes_kw(_raw_key_mgmt): return (cek, ek) def unwrap(self, key, ek): - self.check_key(key) - rk = base64url_decode(key.get_op_key('decrypt')) + rk = self.get_key(key, 'decrypt') # Implement RFC 3394 Key Unwrap - 2.2.3 # TODO: Use cryptography once issue #1733 is resolved @@ -215,11 +223,12 @@ class _aes_cbc_hmac_sha2(_raw_jwe): def __init__(self, hashfn, keybits): self.backend = default_backend() self.hashfn = hashfn - self.blocksize = keybits / 8 + self.keysize = keybits / 8 + self.blocksize = algorithms.AES.block_size @property def key_size(self): - return self.blocksize * 2 + return self.keysize * 2 def _mac(self, k, a, iv, e): al = encode_int(len(a * 8), 64) @@ -229,7 +238,7 @@ class _aes_cbc_hmac_sha2(_raw_jwe): h.update(e) h.update(al) m = h.finalize() - return m[:self.blocksize] + return m[:self.keysize] # draft-ietf-jose-json-web-algorithms-40 - 5.2.2 def encrypt(self, k, a, m): @@ -242,15 +251,15 @@ class _aes_cbc_hmac_sha2(_raw_jwe): Returns a dictionary with the computed data. """ - hkey = k[:self.blocksize] - ekey = k[self.blocksize:] + hkey = k[:self.keysize] + ekey = k[self.keysize:] # encrypt - iv = os.urandom(self.blocksize) + iv = os.urandom(self.blocksize / 8) cipher = Cipher(algorithms.AES(ekey), modes.CBC(iv), backend=self.backend) encryptor = cipher.encryptor() - padder = PKCS7(self.blocksize * 8).padder() + padder = PKCS7(self.blocksize).padder() padded_data = padder.update(m) + padder.finalize() e = encryptor.update(padded_data) + encryptor.finalize() @@ -270,8 +279,8 @@ class _aes_cbc_hmac_sha2(_raw_jwe): Returns plaintext or raises an error """ - hkey = k[:self.blocksize] - dkey = k[self.blocksize:] + hkey = k[:self.keysize] + dkey = k[self.keysize:] # verify mac if not constant_time.bytes_eq(t, self._mac(hkey, a, iv, e)): @@ -282,7 +291,7 @@ class _aes_cbc_hmac_sha2(_raw_jwe): backend=self.backend) decryptor = cipher.decryptor() d = decryptor.update(e) + decryptor.finalize() - unpadder = PKCS7(self.blocksize * 8).unpadder() + unpadder = PKCS7(self.blocksize).unpadder() return unpadder.update(d) + unpadder.finalize() @@ -290,11 +299,11 @@ class _aes_gcm(_raw_jwe): def __init__(self, keybits): self.backend = default_backend() - self.blocksize = keybits / 8 + self.keysize = keybits / 8 @property def key_size(self): - return self.blocksize + return self.keysize # draft-ietf-jose-json-web-algorithms-40 - 5.2.2 def encrypt(self, k, a, m): @@ -376,6 +385,12 @@ class JWE(object): def _jwa_A128KW(self): return _aes_kw(128) + def _jwa_A192KW(self): + return _aes_kw(192) + + def _jwa_A256KW(self): + return _aes_kw(256) + def _jwa_dir(self): return _direct() diff --git a/jwcrypto/tests.py b/jwcrypto/tests.py index e8fcd6b..40fbbbc 100644 --- a/jwcrypto/tests.py +++ b/jwcrypto/tests.py @@ -1,6 +1,6 @@ # Copyright (C) 2015 JWCrypto Project Contributors - see LICENSE file -from jwcrypto.common import base64url_decode +from jwcrypto.common import base64url_decode, base64url_encode from jwcrypto import jwk from jwcrypto import jws from jwcrypto import jwe @@ -651,5 +651,23 @@ class ConformanceTests(unittest.TestCase): def test_jwe_no_protected_header(self): enc = jwe.JWE(plaintext='plain') - enc.add_recipient(jwk.JWK(kty='oct', k='A'*32), + enc.add_recipient(jwk.JWK(kty='oct', k=base64url_encode('A'*16)), '{"alg":"A128KW","enc":"A128GCM"}') + + def test_aes_128(self): + enc = jwe.JWE(plaintext='plain') + key128 = jwk.JWK(kty='oct', k=base64url_encode('A' * (128 / 8))) + enc.add_recipient(key128, '{"alg":"A128KW","enc":"A128CBC-HS256"}') + enc.add_recipient(key128, '{"alg":"A128KW","enc":"A128GCM"}') + + def test_aes_192(self): + enc = jwe.JWE(plaintext='plain') + key192 = jwk.JWK(kty='oct', k=base64url_encode('B' * (192 / 8))) + enc.add_recipient(key192, '{"alg":"A192KW","enc":"A192CBC-HS384"}') + enc.add_recipient(key192, '{"alg":"A192KW","enc":"A192GCM"}') + + def test_aes_256(self): + enc = jwe.JWE(plaintext='plain') + key256 = jwk.JWK(kty='oct', k=base64url_encode('C' * (256 / 8))) + enc.add_recipient(key256, '{"alg":"A256KW","enc":"A256CBC-HS512"}') + enc.add_recipient(key256, '{"alg":"A256KW","enc":"A256GCM"}') |