From 33f36ea10c1db2aaa74818c60933a20a9abe672f Mon Sep 17 00:00:00 2001 From: Simo Sorce Date: Wed, 18 Mar 2015 12:45:45 -0400 Subject: The protected header is optional in some cases Allow the use of a JWE without protected headers. Thanks to Jan Rusnacko for pointing out this flaw. Signed-off-by: Simo Sorce --- jwcrypto/jwe.py | 50 +++++++++++++++++++++++++------------------------- jwcrypto/tests.py | 5 +++++ 2 files changed, 30 insertions(+), 25 deletions(-) diff --git a/jwcrypto/jwe.py b/jwcrypto/jwe.py index 40f8edb..a44b7fe 100644 --- a/jwcrypto/jwe.py +++ b/jwcrypto/jwe.py @@ -399,8 +399,8 @@ class JWE(object): return _aes_gcm(256) def _jwa(self, name): - attr = '_jwa_%s' % name.replace('-', '_').replace('+', '_') try: + attr = '_jwa_%s' % name.replace('-', '_').replace('+', '_') return getattr(self, attr)() except (KeyError, AttributeError): raise InvalidJWAAlgorithm() @@ -412,11 +412,24 @@ class JWE(object): h1.update(h2) return h1 + def get_jose_header(self, header=None): + jh = dict() + if 'protected' in self.objects: + ph = json.loads(self.objects['protected']) + jh = self.merge_headers(jh, ph) + if 'unprotected' in self.objects: + uh = json.loads(self.objects['unprotected']) + jh = self.merge_headers(jh, uh) + if header: + rh = json.loads(header) + jh = self.merge_headers(jh, rh) + return jh + def add_recipient(self, key, header=None): """ Encrypt the provided payload with the given key. :param key: A JWK key of appropriate type for the "alg" - provided in the 'protected' json string. + provided in the JOSE Headers. See draft-ietf-jose-json-web-key-41 :param header: A JSON string representing the per-recipient header. @@ -426,16 +439,9 @@ class JWE(object): if not isinstance(key, JWK): raise ValueError('key is not a JWK object') - ph = json.loads(self.objects['protected']) - if 'unprotected' in self.objects: - uh = json.loads(self.objects['unprotected']) - ph = self.merge_headers(ph, uh) - if header: - rh = json.loads(header) - ph = self.merge_headers(ph, rh) - - alg = self._jwa(ph.get('alg', None)) - enc = self._jwa(ph.get('enc', None)) + jh = self.get_jose_header(header) + alg = self._jwa(jh.get('alg', None)) + enc = self._jwa(jh.get('enc', None)) rec = dict() if header: @@ -450,7 +456,7 @@ class JWE(object): if 'aad' in self.objects: aad += '.' + base64url_encode(self.objects['aad']) - compress = ph.get('zip', None) + compress = jh.get('zip', None) if compress == 'DEF': data = zlib.compress(self.plaintext)[2:-4] elif compress is None: @@ -537,19 +543,13 @@ class JWE(object): # FIXME: allow to specify which algorithms to accept as valid def decrypt(self, key, ppe): - ph = json.loads(self.objects['protected']) - if 'unprotected' in self.objects: - uh = json.loads(self.objects['unprotected']) - ph = self.merge_headers(ph, uh) - if 'header' in ppe: - rh = json.loads(ppe['header']) - ph = self.merge_headers(ph, rh) + jh = self.get_jose_header(ppe.get('header', None)) + # TODO: allow caller to specify list of headers it understands - if 'crit' in ph: - self.check_crit(ph['crit']) + self.check_crit(jh.get('crit', dict())) - alg = self._jwa(ph.get('alg', None)) - enc = self._jwa(ph.get('enc', None)) + alg = self._jwa(jh.get('alg', None)) + enc = self._jwa(jh.get('enc', None)) aad = base64url_encode(self.objects.get('protected', '')) if 'aad' in self.objects: @@ -563,7 +563,7 @@ class JWE(object): self.decryptlog.append('Success') self.cek = cek - compress = ph.get('zip', None) + compress = jh.get('zip', None) if compress == 'DEF': self.plaintext = zlib.decompress(data, -zlib.MAX_WBITS) elif compress is None: diff --git a/jwcrypto/tests.py b/jwcrypto/tests.py index 13dfeb7..d66a5b4 100644 --- a/jwcrypto/tests.py +++ b/jwcrypto/tests.py @@ -648,3 +648,8 @@ class ConformanceTests(unittest.TestCase): kty='RSA', n=1, key_ops=['sign'], use='enc') self.assertRaises(jwk.InvalidJWKValue, jwk.JWK, kty='RSA', n=1, key_ops=['sign', 'sign']) + + def test_jwe_no_protected_header(self): + enc = jwe.JWE(plaintext='plain') + enc.add_recipient(jwk.JWK(kty='oct', k='A'*32), + '{"alg":"A128KW","enc":"A128GCM"}') -- cgit